Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EBPF-595] Create a common process consumer for eventmonitor #30559

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/eventmonitor/consumers/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux || windows
gjulianm marked this conversation as resolved.
Show resolved Hide resolved

// Package consumers contains consumer that can be readily used by other packages without
// having to implement the EventConsumerHandler interface manually:
// - ProcessConsumer (process.go): a consumer of process exec/exit events that can be subscribed to via callbacks
package consumers
val06 marked this conversation as resolved.
Show resolved Hide resolved
171 changes: 171 additions & 0 deletions pkg/eventmonitor/consumers/process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux || windows

package consumers

import (
"fmt"
"sync"
"sync/atomic"

"github.com/DataDog/datadog-agent/pkg/eventmonitor"
"github.com/DataDog/datadog-agent/pkg/security/secl/model"
)

// ProcessCallback is a callback function that will be called when a process exec/exit event is received
// Defined as an alias to avoid type incompatibilities
type ProcessCallback = func(uint32)

// ProcessConsumer represents a consumer of process exec/exit events that can be subscribed to
val06 marked this conversation as resolved.
Show resolved Hide resolved
// via callbacks
type ProcessConsumer struct {
// id is the identification of this consumer in the event data stream
id string

// chanSize is the channel size that will be used to send events to this consumer from the event data stream
chanSize int

// execCallbacks holds all subscriptors to process exec events
gjulianm marked this conversation as resolved.
Show resolved Hide resolved
execCallbacks callbackMap

// exitCallbacks holds all subscriptors to process exit events
gjulianm marked this conversation as resolved.
Show resolved Hide resolved
exitCallbacks callbackMap
}

// event represents the attributes of the generic eventmonitor type that we will copy and use in our process consumer
type event struct {
val06 marked this conversation as resolved.
Show resolved Hide resolved
eventType model.EventType
pid uint32
}

// ProcessConsumer should implement the EventConsumerHandler and EventConsumer interfaces
var _ eventmonitor.EventConsumerHandler = &ProcessConsumer{}
var _ eventmonitor.EventConsumer = &ProcessConsumer{}

// NewProcessConsumer creates a new ProcessConsumer, registering itself with the
// given event monitor. This function should be called with the EventMonitor
// instance created in cmd/system-probe/modules/eventmonitor.go:createEventMonitorModule.
// For tests, use consumers/testutil.NewTestProcessConsumer, which also initializes the event stream for testing accordingly
func NewProcessConsumer(id string, chanSize int, evm *eventmonitor.EventMonitor) (*ProcessConsumer, error) {
pc := &ProcessConsumer{
id: id,
chanSize: chanSize,
execCallbacks: callbackMap{callbacks: make(map[*ProcessCallback]struct{})},
exitCallbacks: callbackMap{callbacks: make(map[*ProcessCallback]struct{})},
}

if err := evm.AddEventConsumerHandler(pc); err != nil {
return nil, fmt.Errorf("cannot add event consumer handler: %w", err)
}

evm.RegisterEventConsumer(pc)

return pc, nil
}

// --- eventmonitor.EventConsumer interface methods

// Start starts the consumer, this is a no-op for this implementation
func (p *ProcessConsumer) Start() error {
return nil
}

// Stop stops the consumer, this is a no-op for this implementation
func (p *ProcessConsumer) Stop() {
}

// ID returns the ID of the consumer
func (p *ProcessConsumer) ID() string {
return p.id
}

// --- eventmonitor.EventConsumerHandler interface methods

// ChanSize returns the size of the channel that the event monitor will use to send events to this handler
func (p *ProcessConsumer) ChanSize() int {
return p.chanSize
}

// EventTypes returns the event types that this handler is interested in
func (p *ProcessConsumer) EventTypes() []model.EventType {
return []model.EventType{model.ExecEventType, model.ExitEventType}
}

// Copy copies the event from the eventmonitor type to the one we use in this struct
func (p *ProcessConsumer) Copy(ev *model.Event) any {
return &event{
eventType: ev.GetEventType(),
pid: ev.GetProcessPid(),
}
}

// HandleEvent handles the event from the event monitor
func (p *ProcessConsumer) HandleEvent(ev any) {
sevent, ok := ev.(*event)
if !ok {
return
}

switch sevent.eventType {
case model.ExecEventType:
p.execCallbacks.call(sevent.pid)
case model.ExitEventType:
p.exitCallbacks.call(sevent.pid)
}
}

// SubscribeExec subscribes to process exec events, and returns the function
// that needs to be called to unsubscribe
func (p *ProcessConsumer) SubscribeExec(callback ProcessCallback) func() {
return p.execCallbacks.add(callback)
}

// SubscribeExit subscribes to process exit events, and returns the function
// that needs to be called to unsubscribe
func (p *ProcessConsumer) SubscribeExit(callback ProcessCallback) func() {
return p.exitCallbacks.add(callback)
}

// callbackMap is a helper struct that holds a map of callbacks and a mutex to protect it
type callbackMap struct {
// callbacks holds the set of callbacks
callbacks map[*ProcessCallback]struct{}

// mutex is the mutex that protects the callbacks map
mutex sync.RWMutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the common pattern to use this struct?
in some cases rwmutex perform worse than the regular mutex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the same pattern as there is in process-monitor, I assumed it was tested to be the best option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

common use case - adding all callbacks (write lock), and then calling them (read lock)

normally, all callbacks are being added (or removed) before (or after) the operation.
We can use regular mutex here, but, if we do intend to make the change - it should be tested to show it improves the usage of rwmutex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

me and @brycekahle discussed rwmutex in some other context. and in some cases the rwmutex actually creates slight overhead (even when just using the readerlock without writer lock).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can do that, we're always going to (potentially) have two goroutines accessing the callback map concurrently, one from the event stream and another one from the subscription.

Copy link
Contributor

@val06 val06 Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have any subscriptions happening in "execution" phase or rather we can limit the subscription during the "init" phase before the handler go-routine starts?

Copy link
Contributor

@val06 val06 Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also these benchmarks for reference

(maybe sync.Map is a better choice)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might not always be possible to limit the subscriptions, specially considering that the current way of adding event stream consumers requires having a global variable to share the consumer, and that could probably change later. Also, it'd require changing the SubscribeX signatures to be able to return an error.

I think it's a good idea to research this a bit more, but I think we should have a better view of dependencies in system-probe modules first to see how that fits with those phases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree that we can leave it out of scope for this PR. Resolving the comment


// hasCallbacks is a flag that indicates if there are any callbacks subscribed, used
// to avoid locking/unlocking the mutex if there are no callbacks
hasCallbacks atomic.Bool
val06 marked this conversation as resolved.
Show resolved Hide resolved
}

// add adds a callback to the callback map and returns a function that can be called to remove it
func (c *callbackMap) add(cb ProcessCallback) func() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.callbacks[&cb] = struct{}{}
c.hasCallbacks.Store(true)

return func() {
c.mutex.Lock()
defer c.mutex.Unlock()
delete(c.callbacks, &cb)
c.hasCallbacks.Store(len(c.callbacks) > 0)
}
}

func (c *callbackMap) call(pid uint32) {
if !c.hasCallbacks.Load() {
return
}

c.mutex.RLock()
defer c.mutex.RUnlock()
for cb := range c.callbacks {
(*cb)(pid)
}
}
35 changes: 35 additions & 0 deletions pkg/eventmonitor/consumers/testutil/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build (linux || windows) && test

// Package testutil provides utilities for using the event monitor in tests
package testutil

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/pkg/eventmonitor"
"github.com/DataDog/datadog-agent/pkg/eventmonitor/consumers"

eventtestutil "github.com/DataDog/datadog-agent/pkg/eventmonitor/testutil"
)

const defaultChanSize = 100

// NewTestProcessConsumer creates a new ProcessConsumer for testing, registering itself with an event monitor
// created for testing. This function should be called in tests that require a ProcessConsumer.
func NewTestProcessConsumer(t *testing.T) *consumers.ProcessConsumer {
var pc *consumers.ProcessConsumer
eventtestutil.StartEventMonitor(t, func(t *testing.T, evm *eventmonitor.EventMonitor) {
var err error
pc, err = consumers.NewProcessConsumer("test", defaultChanSize, evm)
require.NoError(t, err, "failed to create process consumer")
})

return pc
}
Loading