Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CWS] Move tags resolution from cgroup resolver to the tags one #30822

Merged
merged 11 commits into from
Nov 19, 2024
4 changes: 2 additions & 2 deletions pkg/security/probe/opts_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type Opts struct {
PathResolutionEnabled bool
// EnvsVarResolutionEnabled defines if environment variables resolution is enabled
EnvsVarResolutionEnabled bool
// TagsResolver will override the default one. Mainly here for tests.
TagsResolver tags.Resolver
// Tagger will override the default one. Mainly here for tests.
Tagger tags.Tagger
// SyscallsMonitorEnabled enable syscalls map monitor
SyscallsMonitorEnabled bool
// TTYFallbackEnabled enable the tty procfs fallback
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/probe/probe_ebpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2019,7 +2019,7 @@ func NewEBPFProbe(probe *Probe, config *config.Config, opts Opts, telemetry tele
resolversOpts := resolvers.Opts{
PathResolutionEnabled: probe.Opts.PathResolutionEnabled,
EnvVarsResolutionEnabled: probe.Opts.EnvsVarResolutionEnabled,
TagsResolver: probe.Opts.TagsResolver,
Tagger: probe.Opts.Tagger,
UseRingBuffer: useRingBuffers,
TTYFallbackEnabled: probe.Opts.TTYFallbackEnabled,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/probe/probe_ebpfless.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ func NewEBPFLessProbe(probe *Probe, config *config.Config, opts Opts, telemetry
}

resolversOpts := resolvers.Opts{
TagsResolver: opts.TagsResolver,
Tagger: opts.Tagger,
}

p.Resolvers, err = resolvers.NewEBPFLessResolvers(config, p.statsdClient, probe.scrubber, resolversOpts, telemetry)
Expand Down
123 changes: 19 additions & 104 deletions pkg/security/resolvers/cgroup/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,62 +10,56 @@ package cgroup

import (
"context"
"fmt"
"sync"
"time"

"github.com/hashicorp/golang-lru/v2/simplelru"

cgroupModel "github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup/model"
"github.com/DataDog/datadog-agent/pkg/security/resolvers/tags"
"github.com/DataDog/datadog-agent/pkg/security/secl/model"
"github.com/DataDog/datadog-agent/pkg/security/seclog"
"github.com/DataDog/datadog-agent/pkg/security/utils"
)

// Event defines the cgroup event type
type Event int

const (
// WorkloadSelectorResolved is used to notify that a new cgroup with a resolved workload selector is ready
WorkloadSelectorResolved Event = iota
// CGroupDeleted is used to notify that a cgroup was deleted
CGroupDeleted
CGroupDeleted Event = iota + 1
// CGroupCreated new croup created
CGroupCreated
// CGroupMaxEvent is used cap the event ID
CGroupMaxEvent
)

// Listener is used to propagate CGroup events
type Listener func(workload *cgroupModel.CacheEntry)
// ResolverInterface defines the interface implemented by a cgroup resolver
type ResolverInterface interface {
Start(context.Context)
AddPID(*model.ProcessCacheEntry)
GetWorkload(string) (*cgroupModel.CacheEntry, bool)
DelPID(uint32)
DelPIDWithID(string, uint32)
Len() int
RegisterListener(Event, utils.Listener[*cgroupModel.CacheEntry]) error
}

// Resolver defines a cgroup monitor
type Resolver struct {
*utils.Notifier[Event, *cgroupModel.CacheEntry]
sync.RWMutex
workloads *simplelru.LRU[string, *cgroupModel.CacheEntry]
tagsResolver tags.Resolver
workloadsWithoutTags chan *cgroupModel.CacheEntry

listenersLock sync.Mutex
listeners map[Event][]Listener
workloads *simplelru.LRU[string, *cgroupModel.CacheEntry]
}

// NewResolver returns a new cgroups monitor
func NewResolver(tagsResolver tags.Resolver) (*Resolver, error) {
func NewResolver() (*Resolver, error) {
cr := &Resolver{
tagsResolver: tagsResolver,
workloadsWithoutTags: make(chan *cgroupModel.CacheEntry, 100),
listeners: make(map[Event][]Listener),
Notifier: utils.NewNotifier[Event, *cgroupModel.CacheEntry](),
}
workloads, err := simplelru.NewLRU(1024, func(_ string, value *cgroupModel.CacheEntry) {
value.CallReleaseCallback()
value.Deleted.Store(true)

cr.listenersLock.Lock()
defer cr.listenersLock.Unlock()
for _, l := range cr.listeners[CGroupDeleted] {
l(value)
}
cr.NotifyListeners(CGroupDeleted, value)
})
if err != nil {
return nil, err
Expand All @@ -75,45 +69,7 @@ func NewResolver(tagsResolver tags.Resolver) (*Resolver, error) {
}

// Start starts the goroutine of the SBOM resolver
func (cr *Resolver) Start(ctx context.Context) {
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

delayerTick := time.NewTicker(10 * time.Second)
defer delayerTick.Stop()

for {
select {
case <-ctx.Done():
return
case <-delayerTick.C:
select {
case workload := <-cr.workloadsWithoutTags:
cr.checkTags(workload)
default:
}

}
}
}()
}

// RegisterListener registers a CGroup event listener
func (cr *Resolver) RegisterListener(event Event, listener Listener) error {
if event >= CGroupMaxEvent || event < 0 {
return fmt.Errorf("invalid Event: %v", event)
}

cr.listenersLock.Lock()
defer cr.listenersLock.Unlock()

if cr.listeners != nil {
cr.listeners[event] = append(cr.listeners[event], listener)
} else {
return fmt.Errorf("a Listener was inserted before initialization")
}
return nil
func (cr *Resolver) Start(_ context.Context) {
}

// AddPID associates a container id and a pid which is expected to be the pid 1
Expand All @@ -139,48 +95,7 @@ func (cr *Resolver) AddPID(process *model.ProcessCacheEntry) {
// add the new CGroup to the cache
cr.workloads.Add(string(process.ContainerID), newCGroup)

// notify listeners
cr.listenersLock.Lock()
for _, l := range cr.listeners[CGroupCreated] {
l(newCGroup)
}
cr.listenersLock.Unlock()

// check the tags of this workload
cr.checkTags(newCGroup)
}

// checkTags checks if the tags of a workload were properly set
func (cr *Resolver) checkTags(workload *cgroupModel.CacheEntry) {
// check if the workload tags were found
if workload.NeedsTagsResolution() {
// this is a container, try to resolve its tags now
if err := cr.fetchTags(workload); err != nil || workload.NeedsTagsResolution() {
// push to the workloadsWithoutTags chan so that its tags can be resolved later
select {
case cr.workloadsWithoutTags <- workload:
default:
}
return
}
}

// notify listeners
cr.listenersLock.Lock()
defer cr.listenersLock.Unlock()
for _, l := range cr.listeners[WorkloadSelectorResolved] {
l(workload)
}
}

// fetchTags fetches tags for the provided workload
func (cr *Resolver) fetchTags(workload *cgroupModel.CacheEntry) error {
newTags, err := cr.tagsResolver.ResolveWithErr(string(workload.ContainerID))
if err != nil {
return fmt.Errorf("failed to resolve %s: %w", workload.ContainerID, err)
}
workload.SetTags(newTags)
return nil
cr.NotifyListeners(CGroupCreated, newCGroup)
}

// GetWorkload returns the workload referenced by the provided ID
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/resolvers/mount/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func TestMountResolver(t *testing.T) {
pid uint32 = 1
)

cr, _ := cgroup.NewResolver(nil)
cr, _ := cgroup.NewResolver()

// Create mount resolver
mr, _ := NewResolver(nil, cr, ResolverOpts{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/resolvers/opts_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import "github.com/DataDog/datadog-agent/pkg/security/resolvers/tags"
type Opts struct {
PathResolutionEnabled bool
EnvVarsResolutionEnabled bool
TagsResolver tags.Resolver
Tagger tags.Tagger
UseRingBuffer bool
TTYFallbackEnabled bool
}
15 changes: 5 additions & 10 deletions pkg/security/resolvers/resolvers_ebpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type EBPFResolvers struct {
ContainerResolver *container.Resolver
TimeResolver *ktime.Resolver
UserGroupResolver *usergroup.Resolver
TagsResolver tags.Resolver
TagsResolver *tags.LinuxResolver
DentryResolver *dentry.Resolver
ProcessResolver *process.EBPFResolver
NamespaceResolver *netns.Resolver
Expand Down Expand Up @@ -91,18 +91,13 @@ func NewEBPFResolvers(config *config.Config, manager *manager.Manager, statsdCli
}
}

var tagsResolver tags.Resolver
if opts.TagsResolver != nil {
tagsResolver = opts.TagsResolver
} else {
tagsResolver = tags.NewResolver(telemetry)
}

cgroupsResolver, err := cgroup.NewResolver(tagsResolver)
cgroupsResolver, err := cgroup.NewResolver()
if err != nil {
return nil, err
}

tagsResolver := tags.NewResolver(telemetry, opts.Tagger, cgroupsResolver)

userGroupResolver, err := usergroup.NewResolver(cgroupsResolver)
if err != nil {
return nil, err
Expand All @@ -112,7 +107,7 @@ func NewEBPFResolvers(config *config.Config, manager *manager.Manager, statsdCli
if err := cgroupsResolver.RegisterListener(cgroup.CGroupDeleted, sbomResolver.OnCGroupDeletedEvent); err != nil {
return nil, err
}
if err := cgroupsResolver.RegisterListener(cgroup.WorkloadSelectorResolved, sbomResolver.OnWorkloadSelectorResolvedEvent); err != nil {
if err := tagsResolver.RegisterListener(tags.WorkloadSelectorResolved, sbomResolver.OnWorkloadSelectorResolvedEvent); err != nil {
return nil, err
}
}
Expand Down
15 changes: 5 additions & 10 deletions pkg/security/resolvers/resolvers_ebpfless.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@ import (
// EBPFLessResolvers holds the list of the event attribute resolvers
type EBPFLessResolvers struct {
ContainerResolver *container.Resolver
TagsResolver tags.Resolver
TagsResolver *tags.LinuxResolver
ProcessResolver *process.EBPFLessResolver
HashResolver *hash.Resolver
}

// NewEBPFLessResolvers creates a new instance of EBPFLessResolvers
func NewEBPFLessResolvers(config *config.Config, statsdClient statsd.ClientInterface, scrubber *procutil.DataScrubber, opts Opts, telemetry telemetry.Component) (*EBPFLessResolvers, error) {
var tagsResolver tags.Resolver
if opts.TagsResolver != nil {
tagsResolver = opts.TagsResolver
} else {
tagsResolver = tags.NewResolver(telemetry)
cgroupsResolver, err := cgroup.NewResolver()
if err != nil {
return nil, err
}

tagsResolver := tags.NewResolver(telemetry, opts.Tagger, cgroupsResolver)
processOpts := process.NewResolverOpts()
processOpts.WithEnvsValue(config.Probe.EnvsWithValue)

Expand All @@ -48,10 +47,6 @@ func NewEBPFLessResolvers(config *config.Config, statsdClient statsd.ClientInter
return nil, err
}

cgroupsResolver, err := cgroup.NewResolver(tagsResolver)
if err != nil {
return nil, err
}
hashResolver, err := hash.NewResolver(config.RuntimeSecurity, statsdClient, cgroupsResolver)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/resolvers/resolvers_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewResolvers(config *config.Config, statsdClient statsd.ClientInterface, sc
return nil, err
}

tagsResolver := tags.NewResolver(telemetry)
tagsResolver := tags.NewResolver(telemetry, nil)

userSessionsResolver, err := usersessions.NewResolver(config.RuntimeSecurity)
if err != nil {
Expand Down
Loading
Loading