Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent improvements #14415

Merged
merged 5 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions src/control/cmd/daos_agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,22 @@ func (rm refreshMinutes) Duration() time.Duration {

// Config defines the agent configuration.
type Config struct {
SystemName string `yaml:"name"`
AccessPoints []string `yaml:"access_points"`
ControlPort int `yaml:"port"`
RuntimeDir string `yaml:"runtime_dir"`
LogFile string `yaml:"log_file"`
LogLevel common.ControlLogLevel `yaml:"control_log_mask,omitempty"`
TransportConfig *security.TransportConfig `yaml:"transport_config"`
DisableCache bool `yaml:"disable_caching,omitempty"`
CacheExpiration refreshMinutes `yaml:"cache_expiration,omitempty"`
DisableAutoEvict bool `yaml:"disable_auto_evict,omitempty"`
ExcludeFabricIfaces common.StringSet `yaml:"exclude_fabric_ifaces,omitempty"`
FabricInterfaces []*NUMAFabricConfig `yaml:"fabric_ifaces,omitempty"`
TelemetryPort int `yaml:"telemetry_port,omitempty"`
TelemetryEnabled bool `yaml:"telemetry_enabled,omitempty"`
TelemetryRetain time.Duration `yaml:"telemetry_retain,omitempty"`
SystemName string `yaml:"name"`
AccessPoints []string `yaml:"access_points"`
ControlPort int `yaml:"port"`
RuntimeDir string `yaml:"runtime_dir"`
LogFile string `yaml:"log_file"`
LogLevel common.ControlLogLevel `yaml:"control_log_mask,omitempty"`
CredentialConfig *security.CredentialConfig `yaml:"credential_config"`
TransportConfig *security.TransportConfig `yaml:"transport_config"`
DisableCache bool `yaml:"disable_caching,omitempty"`
CacheExpiration refreshMinutes `yaml:"cache_expiration,omitempty"`
DisableAutoEvict bool `yaml:"disable_auto_evict,omitempty"`
ExcludeFabricIfaces common.StringSet `yaml:"exclude_fabric_ifaces,omitempty"`
FabricInterfaces []*NUMAFabricConfig `yaml:"fabric_ifaces,omitempty"`
TelemetryPort int `yaml:"telemetry_port,omitempty"`
TelemetryEnabled bool `yaml:"telemetry_enabled,omitempty"`
TelemetryRetain time.Duration `yaml:"telemetry_retain,omitempty"`
}

// TelemetryExportEnabled returns true if client telemetry export is enabled.
Expand Down Expand Up @@ -112,12 +113,13 @@ func LoadConfig(cfgPath string) (*Config, error) {
func DefaultConfig() *Config {
localServer := fmt.Sprintf("localhost:%d", build.DefaultControlPort)
return &Config{
SystemName: build.DefaultSystemName,
ControlPort: build.DefaultControlPort,
AccessPoints: []string{localServer},
RuntimeDir: defaultRuntimeDir,
LogFile: defaultLogFile,
LogLevel: common.DefaultControlLogLevel,
TransportConfig: security.DefaultAgentTransportConfig(),
SystemName: build.DefaultSystemName,
ControlPort: build.DefaultControlPort,
AccessPoints: []string{localServer},
RuntimeDir: defaultRuntimeDir,
LogFile: defaultLogFile,
LogLevel: common.DefaultControlLogLevel,
TransportConfig: security.DefaultAgentTransportConfig(),
CredentialConfig: &security.CredentialConfig{},
}
}
32 changes: 25 additions & 7 deletions src/control/cmd/daos_agent/config_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2021-2023 Intel Corporation.
// (C) Copyright 2021-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -46,6 +46,13 @@ control_log_mask: debug
disable_caching: true
cache_expiration: 30
disable_auto_evict: true
credential_config:
cache_lifetime: 10m
client_user_map:
1000:
user: frodo
group: baggins
groups: ["ringbearers"]
transport_config:
allow_insecure: true
exclude_fabric_ifaces: ["ib3"]
Expand Down Expand Up @@ -104,12 +111,13 @@ transport_config:
"without optional items": {
path: withoutOptCfg,
expResult: &Config{
SystemName: "shire",
AccessPoints: []string{"one:10001", "two:10001"},
ControlPort: 4242,
RuntimeDir: "/tmp/runtime",
LogFile: "/home/frodo/logfile",
LogLevel: common.DefaultControlLogLevel,
SystemName: "shire",
AccessPoints: []string{"one:10001", "two:10001"},
ControlPort: 4242,
RuntimeDir: "/tmp/runtime",
LogFile: "/home/frodo/logfile",
LogLevel: common.DefaultControlLogLevel,
CredentialConfig: &security.CredentialConfig{},
TransportConfig: &security.TransportConfig{
AllowInsecure: true,
CertificateConfig: DefaultConfig().TransportConfig.CertificateConfig,
Expand All @@ -132,6 +140,16 @@ transport_config:
DisableCache: true,
CacheExpiration: refreshMinutes(30 * time.Minute),
DisableAutoEvict: true,
CredentialConfig: &security.CredentialConfig{
CacheLifetime: time.Minute * 10,
ClientUserMap: map[uint32]*security.MappedClientUser{
1000: {
User: "frodo",
Group: "baggins",
Groups: []string{"ringbearers"},
},
},
},
TransportConfig: &security.TransportConfig{
AllowInsecure: true,
CertificateConfig: DefaultConfig().TransportConfig.CertificateConfig,
Expand Down
75 changes: 63 additions & 12 deletions src/control/cmd/daos_agent/infocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,22 @@ func getFabricScanFn(log logging.Logger, cfg *Config, scanner *hardware.FabricSc
}

type cacheItem struct {
sync.Mutex
sync.RWMutex
lastCached time.Time
refreshInterval time.Duration
}

// isStale returns true if the cache item is stale.
// NB: Should be run under a lock to protect lastCached.
func (ci *cacheItem) isStale() bool {
if ci.refreshInterval == 0 {
return false
}
return ci.lastCached.Add(ci.refreshInterval).Before(time.Now())
}

// isCached returns true if the cache item is cached.
// NB: Should be run under at least a read lock to protect lastCached.
func (ci *cacheItem) isCached() bool {
return !ci.lastCached.Equal(time.Time{})
}
Expand Down Expand Up @@ -130,16 +134,30 @@ func (ci *cachedAttachInfo) Key() string {
return sysAttachInfoKey(ci.system)
}

// NeedsRefresh checks whether the cached data needs to be refreshed.
func (ci *cachedAttachInfo) NeedsRefresh() bool {
// needsRefresh checks whether the cached data needs to be refreshed.
func (ci *cachedAttachInfo) needsRefresh() bool {
if ci == nil {
return false
}
return !ci.isCached() || ci.isStale()
}

// Refresh contacts the remote management server and refreshes the GetAttachInfo cache.
func (ci *cachedAttachInfo) Refresh(ctx context.Context) error {
// RefreshIfNeeded refreshes the cached data if it needs to be refreshed.
func (ci *cachedAttachInfo) RefreshIfNeeded(ctx context.Context) (func(), bool, error) {
if ci == nil {
return cache.NoopRelease, false, errors.New("cachedAttachInfo is nil")
}

ci.Lock()
if ci.needsRefresh() {
return ci.Unlock, true, ci.refresh(ctx)
}
return ci.Unlock, false, nil
}

// refresh implements the actual refresh logic.
// NB: Should be run under a lock.
func (ci *cachedAttachInfo) refresh(ctx context.Context) error {
if ci == nil {
return errors.New("cachedAttachInfo is nil")
}
Expand All @@ -155,6 +173,16 @@ func (ci *cachedAttachInfo) Refresh(ctx context.Context) error {
return nil
}

// Refresh contacts the remote management server and refreshes the GetAttachInfo cache.
func (ci *cachedAttachInfo) Refresh(ctx context.Context) (func(), error) {
if ci == nil {
return cache.NoopRelease, errors.New("cachedAttachInfo is nil")
}

ci.Lock()
return ci.Unlock, ci.refresh(ctx)
}

type cachedFabricInfo struct {
cacheItem
fetch fabricScanFn
Expand All @@ -172,17 +200,31 @@ func (cfi *cachedFabricInfo) Key() string {
return fabricKey
}

// NeedsRefresh indicates that the fabric information does not need to be refreshed unless it has
// needsRefresh indicates that the fabric information does not need to be refreshed unless it has
// never been populated.
func (cfi *cachedFabricInfo) NeedsRefresh() bool {
func (cfi *cachedFabricInfo) needsRefresh() bool {
if cfi == nil {
return false
}
return !cfi.isCached()
}

// Refresh scans the hardware for information about the fabric devices and caches the result.
func (cfi *cachedFabricInfo) Refresh(ctx context.Context) error {
// RefreshIfNeeded refreshes the cached fabric information if it needs to be refreshed.
func (cfi *cachedFabricInfo) RefreshIfNeeded(ctx context.Context) (func(), bool, error) {
if cfi == nil {
return cache.NoopRelease, false, errors.New("cachedFabricInfo is nil")
}

cfi.Lock()
if cfi.needsRefresh() {
return cfi.Unlock, true, cfi.refresh(ctx)
}
return cfi.Unlock, false, nil
}

// refresh implements the actual refresh logic.
// NB: Should be run under a lock.
func (cfi *cachedFabricInfo) refresh(ctx context.Context) error {
if cfi == nil {
return errors.New("cachedFabricInfo is nil")
}
Expand All @@ -197,6 +239,16 @@ func (cfi *cachedFabricInfo) Refresh(ctx context.Context) error {
return nil
}

// Refresh scans the hardware for information about the fabric devices and caches the result.
func (cfi *cachedFabricInfo) Refresh(ctx context.Context) (func(), error) {
if cfi == nil {
return cache.NoopRelease, errors.New("cachedFabricInfo is nil")
}

cfi.Lock()
return cfi.Unlock, cfi.refresh(ctx)
}

// InfoCache is a cache for the results of expensive operations needed by the agent.
type InfoCache struct {
log logging.Logger
Expand Down Expand Up @@ -350,15 +402,14 @@ func (c *InfoCache) GetAttachInfo(ctx context.Context, sys string) (*control.Get
}
createItem := func() (cache.Item, error) {
c.log.Debugf("cache miss for %s", sysAttachInfoKey(sys))
cai := newCachedAttachInfo(c.attachInfoRefresh, sys, c.client, c.getAttachInfo)
return cai, nil
return newCachedAttachInfo(c.attachInfoRefresh, sys, c.client, c.getAttachInfo), nil
}

item, release, err := c.cache.GetOrCreate(ctx, sysAttachInfoKey(sys), createItem)
defer release()
if err != nil {
return nil, errors.Wrap(err, "getting attach info from cache")
}
defer release()

cai, ok := item.(*cachedAttachInfo)
if !ok {
Expand Down
36 changes: 28 additions & 8 deletions src/control/cmd/daos_agent/infocache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,21 +149,31 @@ func TestAgent_cachedAttachInfo_Key(t *testing.T) {
}
}

func TestAgent_cachedAttachInfo_NeedsRefresh(t *testing.T) {
func TestAgent_cachedAttachInfo_RefreshIfNeeded(t *testing.T) {
log, buf := logging.NewTestLogger(t.Name())
defer test.ShowBufferOnFailure(t, buf)
mockClient := control.NewMockInvoker(log, &control.MockInvokerConfig{})

noopGetAttachInfo := func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
return nil, nil
}

for name, tc := range map[string]struct {
ai *cachedAttachInfo
expResult bool
}{
"nil": {},
"never cached": {
ai: newCachedAttachInfo(0, "test", nil, nil),
ai: newCachedAttachInfo(0, "test", mockClient, noopGetAttachInfo),
expResult: true,
},
"no refresh": {
ai: &cachedAttachInfo{
cacheItem: cacheItem{
lastCached: time.Now().Add(-time.Minute),
},
rpcClient: mockClient,
fetch: noopGetAttachInfo,
lastResponse: &control.GetAttachInfoResp{},
},
},
Expand All @@ -173,6 +183,8 @@ func TestAgent_cachedAttachInfo_NeedsRefresh(t *testing.T) {
lastCached: time.Now().Add(-time.Minute),
refreshInterval: time.Second,
},
rpcClient: mockClient,
fetch: noopGetAttachInfo,
lastResponse: &control.GetAttachInfoResp{},
},
expResult: true,
Expand All @@ -183,12 +195,15 @@ func TestAgent_cachedAttachInfo_NeedsRefresh(t *testing.T) {
lastCached: time.Now().Add(-time.Second),
refreshInterval: time.Minute,
},
rpcClient: mockClient,
fetch: noopGetAttachInfo,
lastResponse: &control.GetAttachInfoResp{},
},
},
} {
t.Run(name, func(t *testing.T) {
test.AssertEqual(t, tc.expResult, tc.ai.NeedsRefresh(), "")
_, refreshed, _ := tc.ai.RefreshIfNeeded(test.Context(t))
test.AssertEqual(t, tc.expResult, refreshed, "")
})
}
}
Expand Down Expand Up @@ -271,7 +286,8 @@ func TestAgent_cachedAttachInfo_Refresh(t *testing.T) {
}
}

err := ai.Refresh(test.Context(t))
release, err := ai.Refresh(test.Context(t))
release()

test.CmpErr(t, tc.expErr, err)

Expand Down Expand Up @@ -320,7 +336,7 @@ func TestAgent_cachedFabricInfo_Key(t *testing.T) {
}
}

func TestAgent_cachedFabricInfo_NeedsRefresh(t *testing.T) {
func TestAgent_cachedFabricInfo_RefreshIfNeeded(t *testing.T) {
for name, tc := range map[string]struct {
nilCache bool
cacheTime time.Time
Expand All @@ -342,11 +358,14 @@ func TestAgent_cachedFabricInfo_NeedsRefresh(t *testing.T) {

var cfi *cachedFabricInfo
if !tc.nilCache {
cfi = newCachedFabricInfo(log, nil)
cfi = newCachedFabricInfo(log, func(_ context.Context, _ ...string) (*NUMAFabric, error) {
return nil, nil
})
cfi.cacheItem.lastCached = tc.cacheTime
}

test.AssertEqual(t, tc.expResult, cfi.NeedsRefresh(), "")
_, refreshed, _ := cfi.RefreshIfNeeded(test.Context(t))
test.AssertEqual(t, tc.expResult, refreshed, "")
})
}
}
Expand Down Expand Up @@ -416,7 +435,8 @@ func TestAgent_cachedFabricInfo_Refresh(t *testing.T) {
}
}

err := cfi.Refresh(test.Context(t))
release, err := cfi.Refresh(test.Context(t))
release()

test.CmpErr(t, tc.expErr, err)

Expand Down
Loading
Loading