Skip to content

Commit

Permalink
Add Notifier type
Browse files Browse the repository at this point in the history
  • Loading branch information
lebauce committed Nov 14, 2024
1 parent 18ce7fa commit a3247b6
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 104 deletions.
41 changes: 5 additions & 36 deletions pkg/security/resolvers/cgroup/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ package cgroup

import (
"context"
"fmt"
"sync"

"github.com/hashicorp/golang-lru/v2/simplelru"

cgroupModel "github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup/model"
"github.com/DataDog/datadog-agent/pkg/security/secl/model"
"github.com/DataDog/datadog-agent/pkg/security/seclog"
"github.com/DataDog/datadog-agent/pkg/security/utils"
)

// Event defines the cgroup event type
Expand All @@ -32,32 +32,23 @@ const (
CGroupMaxEvent
)

// Listener is used to propagate CGroup events
type Listener func(workload *cgroupModel.CacheEntry)

// Resolver defines a cgroup monitor
type Resolver struct {
*utils.Notifier[Event, *cgroupModel.CacheEntry]
sync.RWMutex
workloads *simplelru.LRU[string, *cgroupModel.CacheEntry]

listenersLock sync.Mutex
listeners map[Event][]Listener
}

// NewResolver returns a new cgroups monitor
func NewResolver() (*Resolver, error) {
cr := &Resolver{
listeners: make(map[Event][]Listener),
Notifier: utils.NewNotifier[Event, *cgroupModel.CacheEntry](),
}
workloads, err := simplelru.NewLRU(1024, func(_ string, value *cgroupModel.CacheEntry) {
value.CallReleaseCallback()
value.Deleted.Store(true)

cr.listenersLock.Lock()
defer cr.listenersLock.Unlock()
for _, l := range cr.listeners[CGroupDeleted] {
l(value)
}
cr.NotifyListener(CGroupDeleted, value)
})
if err != nil {
return nil, err
Expand All @@ -70,23 +61,6 @@ func NewResolver() (*Resolver, error) {
func (cr *Resolver) Start(_ context.Context) {
}

// RegisterListener registers a CGroup event listener
func (cr *Resolver) RegisterListener(event Event, listener Listener) error {
if event >= CGroupMaxEvent || event < 0 {
return fmt.Errorf("invalid Event: %v", event)
}

cr.listenersLock.Lock()
defer cr.listenersLock.Unlock()

if cr.listeners != nil {
cr.listeners[event] = append(cr.listeners[event], listener)
} else {
return fmt.Errorf("a Listener was inserted before initialization")
}
return nil
}

// AddPID associates a container id and a pid which is expected to be the pid 1
func (cr *Resolver) AddPID(process *model.ProcessCacheEntry) {
cr.Lock()
Expand All @@ -110,12 +84,7 @@ func (cr *Resolver) AddPID(process *model.ProcessCacheEntry) {
// add the new CGroup to the cache
cr.workloads.Add(string(process.ContainerID), newCGroup)

// notify listeners
cr.listenersLock.Lock()
for _, l := range cr.listeners[CGroupCreated] {
l(newCGroup)
}
cr.listenersLock.Unlock()
cr.NotifyListener(CGroupCreated, newCGroup)
}

// GetWorkload returns the workload referenced by the provided ID
Expand Down
31 changes: 6 additions & 25 deletions pkg/security/resolvers/tags/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"context"
"fmt"
"strings"
"sync"

coreconfig "github.com/DataDog/datadog-agent/comp/core/config"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/api/security"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/security/probe/config"
cgroupModel "github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup/model"
"github.com/DataDog/datadog-agent/pkg/security/utils"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand Down Expand Up @@ -53,25 +53,20 @@ func (n *nullTagger) Tag(_ types.EntityID, _ types.TagCardinality) ([]string, er
return nil, nil
}

func (n *nullTagger) RegisterListener(_ Event, _ Listener) error {
return nil
}

// Resolver represents a cache resolver
type Resolver interface {
Start(ctx context.Context) error
Stop() error
Resolve(id string) []string
ResolveWithErr(id string) ([]string, error)
GetValue(id string, tag string) string
RegisterListener(event Event, listener Listener) error
RegisterListener(event Event, listener utils.Listener[*cgroupModel.CacheEntry]) error
}

// DefaultResolver represents a default resolver based directly on the underlying tagger
type DefaultResolver struct {
tagger Tagger
listenersLock sync.Mutex
listeners map[Event][]Listener
*utils.Notifier[Event, *cgroupModel.CacheEntry]
tagger Tagger
}

// Resolve returns the tags for the given id
Expand Down Expand Up @@ -119,26 +114,12 @@ func (t *DefaultResolver) Stop() error {
return t.tagger.Stop()
}

// RegisterListener registers a tags event listener
func (t *DefaultResolver) RegisterListener(event Event, listener Listener) error {
t.listenersLock.Lock()
defer t.listenersLock.Unlock()

if t.listeners != nil {
t.listeners[event] = append(t.listeners[event], listener)
} else {
return fmt.Errorf("a listener was inserted before initialization")
}
return nil
}

// NewDefaultResolver returns a new default tags resolver
func NewDefaultResolver(config *config.Config, telemetry telemetry.Component) *DefaultResolver {
ddConfig := pkgconfigsetup.Datadog()
listeners := make(map[Event][]Listener)
resolver := &DefaultResolver{
tagger: &nullTagger{},
listeners: listeners,
tagger: &nullTagger{},
Notifier: utils.NewNotifier[Event, *cgroupModel.CacheEntry](),
}

if config.RemoteTaggerEnabled {
Expand Down
29 changes: 3 additions & 26 deletions pkg/security/resolvers/tags/resolver_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,12 @@ import (
"fmt"
"time"

"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl/remote"
taggerTelemetry "github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/security/probe/config"
"github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup"
cgroupModel "github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup/model"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// Listener is used to propagate tags events
type Listener func(workload *cgroupModel.CacheEntry)

// LinuxResolver represents a default resolver based directly on the underlying tagger
type LinuxResolver struct {
*DefaultResolver
Expand Down Expand Up @@ -82,12 +74,7 @@ func (t *LinuxResolver) checkTags(workload *cgroupModel.CacheEntry) {
}
}

// notify listeners
t.listenersLock.Lock()
defer t.listenersLock.Unlock()
for _, l := range t.listeners[WorkloadSelectorResolved] {
l(workload)
}
t.NotifyListener(WorkloadSelectorResolved, workload)
}

// fetchTags fetches tags for the provided workload
Expand All @@ -102,20 +89,10 @@ func (t *LinuxResolver) fetchTags(container *cgroupModel.CacheEntry) error {

// NewResolver returns a new tags resolver
func NewResolver(config *config.Config, telemetry telemetry.Component, cgroupsResolver *cgroup.Resolver) Resolver {
defaultResolver := NewDefaultResolver(config, telemetry)
workloadsWithoutTags := make(chan *cgroupModel.CacheEntry, 100)
resolver := &LinuxResolver{
DefaultResolver: defaultResolver,
workloadsWithoutTags: workloadsWithoutTags,
DefaultResolver: NewDefaultResolver(config, telemetry),
workloadsWithoutTags: make(chan *cgroupModel.CacheEntry, 100),
cgroupResolver: cgroupsResolver,
}
if config.RemoteTaggerEnabled {
options, err := remote.NodeAgentOptionsForSecurityResolvers(pkgconfigsetup.Datadog())
if err != nil {
log.Errorf("unable to configure the remote tagger: %s", err)
} else {
resolver.tagger = remote.NewTagger(options, pkgconfigsetup.Datadog(), taggerTelemetry.NewStore(telemetry), types.NewMatchAllFilter())
}
}
return resolver
}
27 changes: 10 additions & 17 deletions pkg/security/tests/fake_tags_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// FakeResolver represents a fake cache resolver
type FakeResolver struct {
sync.Mutex
*utils.Notifier[tags.Event, *cgroupModel.CacheEntry]
containerIDs []string
}

Expand Down Expand Up @@ -62,20 +63,18 @@ func (fr *FakeResolver) GetValue(id string, tag string) string {
return utils.GetTagValue(tag, fr.Resolve(id))
}

// RegisterListener registers a tags event listener
func (fr *FakeResolver) RegisterListener(_ tags.Event, _ tags.Listener) error {
return nil
}

// NewFakeResolverDifferentImageNames returns a new tags resolver
func NewFakeResolverDifferentImageNames() tags.Resolver {
return &FakeResolver{}
return &FakeResolver{
Notifier: utils.NewNotifier[tags.Event, *cgroupModel.CacheEntry](),
}
}

// This fake resolver will allways give the same image_name, no matter the container ID

// FakeMonoResolver represents a fake mono resolver
type FakeMonoResolver struct {
*utils.Notifier[tags.Event, *cgroupModel.CacheEntry]
}

// Start the resolver
Expand Down Expand Up @@ -103,21 +102,19 @@ func (fmr *FakeMonoResolver) GetValue(id string, tag string) string {
return utils.GetTagValue(tag, fmr.Resolve(id))
}

// RegisterListener registers a tags event listener
func (fmr *FakeMonoResolver) RegisterListener(_ tags.Event, _ tags.Listener) error {
return nil
}

// NewFakeMonoResolver returns a new tags resolver
func NewFakeMonoResolver() tags.Resolver {
return &FakeMonoResolver{}
return &FakeMonoResolver{
Notifier: utils.NewNotifier[tags.Event, *cgroupModel.CacheEntry](),
}
}

// This fake resolver will let us specify the next containerID to be resolved

// FakeManualResolver represents a fake manual resolver
type FakeManualResolver struct {
sync.Mutex
*utils.Notifier[tags.Event, *cgroupModel.CacheEntry]
containerToSelector map[string]*cgroupModel.WorkloadSelector
cpt int
nextSelectors []*cgroupModel.WorkloadSelector
Expand Down Expand Up @@ -189,14 +186,10 @@ func (fmr *FakeManualResolver) GetValue(id string, tag string) string {
return utils.GetTagValue(tag, fmr.Resolve(id))
}

// RegisterListener registers a tags event listener
func (fmr *FakeManualResolver) RegisterListener(_ tags.Event, _ tags.Listener) error {
return nil
}

// NewFakeManualResolver returns a new tags resolver
func NewFakeManualResolver() *FakeManualResolver {
return &FakeManualResolver{
Notifier: utils.NewNotifier[tags.Event, *cgroupModel.CacheEntry](),
containerToSelector: make(map[string]*cgroupModel.WorkloadSelector),
}
}
53 changes: 53 additions & 0 deletions pkg/security/utils/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

// Package utils holds utils related files
package utils

import (
"fmt"
"sync"
)

type event interface{ comparable }

// Listener describes the callback called by a notifier
type Listener[O any] func(obj O)

// Notifier describes a type that calls back listener that registered for a specific set of events
type Notifier[E event, O any] struct {
listenersLock sync.RWMutex
listeners map[E][]Listener[O]
}

// RegisterListener registers an event listener
func (n *Notifier[E, O]) RegisterListener(event E, listener Listener[O]) error {
n.listenersLock.Lock()
defer n.listenersLock.Unlock()

if n.listeners != nil {
n.listeners[event] = append(n.listeners[event], listener)
} else {
return fmt.Errorf("a listener was inserted before initialization")
}
return nil
}

func (n *Notifier[E, O]) NotifyListener(event E, obj O) {
// notify listeners
n.listenersLock.RLock()
for _, l := range n.listeners[event] {
l(obj)
}
n.listenersLock.RUnlock()

}

// NewNotifier returns a new notifier
func NewNotifier[E event, O any]() *Notifier[E, O] {
return &Notifier[E, O]{
listeners: make(map[E][]Listener[O]),
}
}

0 comments on commit a3247b6

Please sign in to comment.