Skip to content

Commit

Permalink
Fix compilation on Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
lebauce committed Nov 12, 2024
1 parent fa9ec33 commit 68264f5
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 94 deletions.
118 changes: 24 additions & 94 deletions pkg/security/resolvers/tags/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,20 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl/remote"
taggerTelemetry "github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/security/probe/config"
"github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup"
cgroupModel "github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup/model"
"github.com/DataDog/datadog-agent/pkg/security/utils"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// Event defines the tags event type
type Event int

// Listener is used to propagate tags events
type Listener func(workload *cgroupModel.CacheEntry)

const (
// WorkloadSelectorResolved is used to notify that a new cgroup with a resolved workload selector is ready
WorkloadSelectorResolved Event = iota
Expand Down Expand Up @@ -73,53 +67,9 @@ type Resolver interface {

// DefaultResolver represents a default resolver based directly on the underlying tagger
type DefaultResolver struct {
tagger Tagger
listenersLock sync.Mutex
listeners map[Event][]Listener
workloadsWithoutTags chan *cgroupModel.CacheEntry
cgroupResolver *cgroup.Resolver
}

// Start the resolver
func (t *DefaultResolver) Start(ctx context.Context) error {
if err := t.cgroupResolver.RegisterListener(cgroup.CGroupCreated, t.checkTags); err != nil {
return err
}

go func() {
if err := t.tagger.Start(ctx); err != nil {
log.Errorf("failed to init tagger: %s", err)
}
}()

go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

delayerTick := time.NewTicker(10 * time.Second)
defer delayerTick.Stop()

for {
select {
case <-ctx.Done():
return
case <-delayerTick.C:
select {
case workload := <-t.workloadsWithoutTags:
t.checkTags(workload)
default:
}

}
}
}()

go func() {
<-ctx.Done()
_ = t.tagger.Stop()
}()

return nil
tagger Tagger
listenersLock sync.Mutex
listeners map[Event][]Listener
}

// Resolve returns the tags for the given id
Expand All @@ -146,66 +96,46 @@ func (t *DefaultResolver) GetValue(id string, tag string) string {
return utils.GetTagValue(tag, t.Resolve(id))
}

// Stop the resolver
func (t *DefaultResolver) Stop() error {
return t.tagger.Stop()
}

// checkTags checks if the tags of a workload were properly set
func (t *DefaultResolver) checkTags(workload *cgroupModel.CacheEntry) {
// check if the workload tags were found
if workload.NeedsTagsResolution() {
// this is a container, try to resolve its tags now
if err := t.fetchTags(workload); err != nil || workload.NeedsTagsResolution() {
// push to the workloadsWithoutTags chan so that its tags can be resolved later
select {
case t.workloadsWithoutTags <- workload:
default:
}
return
// Start the resolver
func (t *DefaultResolver) Start(ctx context.Context) error {
go func() {
if err := t.tagger.Start(ctx); err != nil {
log.Errorf("failed to init tagger: %s", err)
}
}
}()

// notify listeners
t.listenersLock.Lock()
defer t.listenersLock.Unlock()
for _, l := range t.listeners[WorkloadSelectorResolved] {
l(workload)
}
}
go func() {
<-ctx.Done()
_ = t.tagger.Stop()
}()

// fetchTags fetches tags for the provided workload
func (t *DefaultResolver) fetchTags(container *cgroupModel.CacheEntry) error {
newTags, err := t.ResolveWithErr(string(container.ContainerID))
if err != nil {
return fmt.Errorf("failed to resolve %s: %w", container.ContainerID, err)
}
container.SetTags(newTags)
return nil
}

// RegisterListener registers a CGroup event listener
// Stop the resolver
func (t *DefaultResolver) Stop() error {
return t.tagger.Stop()
}

// RegisterListener registers a tags event listener
func (t *DefaultResolver) RegisterListener(event Event, listener Listener) error {
t.listenersLock.Lock()
defer t.listenersLock.Unlock()

if t.listeners != nil {
t.listeners[event] = append(t.listeners[event], listener)
} else {
return fmt.Errorf("a Listener was inserted before initialization")
return fmt.Errorf("a listener was inserted before initialization")
}
return nil
}

// NewResolver returns a new tags resolver
func NewResolver(config *config.Config, telemetry telemetry.Component, cgroupsResolver *cgroup.Resolver) Resolver {
workloadsWithoutTags := make(chan *cgroupModel.CacheEntry, 100)
// NewDefaultResolver returns a new default tags resolver
func NewDefaultResolver(config *config.Config, telemetry telemetry.Component) *DefaultResolver {
listeners := make(map[Event][]Listener)
resolver := &DefaultResolver{
tagger: &nullTagger{},
workloadsWithoutTags: workloadsWithoutTags,
listeners: listeners,
cgroupResolver: cgroupsResolver,
tagger: &nullTagger{},
listeners: listeners,
}
if config.RemoteTaggerEnabled {
options, err := remote.NodeAgentOptionsForSecurityResolvers(pkgconfigsetup.Datadog())
Expand Down
121 changes: 121 additions & 0 deletions pkg/security/resolvers/tags/resolver_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

// Package tags holds tags related files
package tags

import (
"context"
"fmt"
"time"

"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl/remote"
taggerTelemetry "github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/security/probe/config"
"github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup"
cgroupModel "github.com/DataDog/datadog-agent/pkg/security/resolvers/cgroup/model"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// Listener is used to propagate tags events
type Listener func(workload *cgroupModel.CacheEntry)

// LinuxResolver represents a default resolver based directly on the underlying tagger
type LinuxResolver struct {
*DefaultResolver
workloadsWithoutTags chan *cgroupModel.CacheEntry
cgroupResolver *cgroup.Resolver
}

// Start the resolver
func (t *LinuxResolver) Start(ctx context.Context) error {
if err := t.DefaultResolver.Start(ctx); err != nil {
return err
}

if err := t.cgroupResolver.RegisterListener(cgroup.CGroupCreated, t.checkTags); err != nil {
return err
}

go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

delayerTick := time.NewTicker(10 * time.Second)
defer delayerTick.Stop()

for {
select {
case <-ctx.Done():
return
case <-delayerTick.C:
select {
case workload := <-t.workloadsWithoutTags:
t.checkTags(workload)
default:
}

}
}
}()

return nil
}

// checkTags checks if the tags of a workload were properly set
func (t *LinuxResolver) checkTags(workload *cgroupModel.CacheEntry) {
// check if the workload tags were found
if workload.NeedsTagsResolution() {
// this is a container, try to resolve its tags now
if err := t.fetchTags(workload); err != nil || workload.NeedsTagsResolution() {
// push to the workloadsWithoutTags chan so that its tags can be resolved later
select {
case t.workloadsWithoutTags <- workload:
default:
}
return
}
}

// notify listeners
t.listenersLock.Lock()
defer t.listenersLock.Unlock()
for _, l := range t.listeners[WorkloadSelectorResolved] {
l(workload)
}
}

// fetchTags fetches tags for the provided workload
func (t *LinuxResolver) fetchTags(container *cgroupModel.CacheEntry) error {
newTags, err := t.ResolveWithErr(string(container.ContainerID))
if err != nil {
return fmt.Errorf("failed to resolve %s: %w", container.ContainerID, err)
}
container.SetTags(newTags)
return nil
}

// NewResolver returns a new tags resolver
func NewResolver(config *config.Config, telemetry telemetry.Component, cgroupsResolver *cgroup.Resolver) Resolver {
defaultResolver := NewDefaultResolver(config, telemetry)
workloadsWithoutTags := make(chan *cgroupModel.CacheEntry, 100)
resolver := &LinuxResolver{
DefaultResolver: defaultResolver,
workloadsWithoutTags: workloadsWithoutTags,
cgroupResolver: cgroupsResolver,
}
if config.RemoteTaggerEnabled {
options, err := remote.NodeAgentOptionsForSecurityResolvers(pkgconfigsetup.Datadog())
if err != nil {
log.Errorf("unable to configure the remote tagger: %s", err)
} else {
resolver.tagger = remote.NewTagger(options, pkgconfigsetup.Datadog(), taggerTelemetry.NewStore(telemetry), types.NewMatchAllFilter())
}
}
return resolver
}
22 changes: 22 additions & 0 deletions pkg/security/resolvers/tags/resolver_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build !linux

// Package tags holds tags related files
package tags

import (
"github.com/DataDog/datadog-agent/comp/core/telemetry"
"github.com/DataDog/datadog-agent/pkg/security/probe/config"
)

// Listener is used to propagate tags events
type Listener func(workload interface{})

// NewResolver returns a new tags resolver
func NewResolver(config *config.Config, telemetry telemetry.Component) Resolver {
return NewDefaultResolver(config, telemetry)
}

0 comments on commit 68264f5

Please sign in to comment.