Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EBPF] refactored gpu probe to decouple init and start phases #30615

Merged
merged 15 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/system-probe/modules/gpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ var GPUMonitoring = module.Factory{
return nil, fmt.Errorf("unable to initialize NVML library: %v", ret)
}

t, err := gpu.NewProbe(c, probeDeps)
p, err := gpu.NewProbe(c, probeDeps)
if err != nil {
return nil, fmt.Errorf("unable to start GPU monitoring: %w", err)
return nil, fmt.Errorf("unable to start %s: %w", config.GPUMonitoringModule, err)
}

return &GPUMonitoringModule{
Probe: t,
Probe: p,
lastCheck: atomic.NewInt64(0),
}, nil
},
Expand Down
262 changes: 157 additions & 105 deletions pkg/gpu/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ package gpu

import (
"fmt"
"io"
"math"
"os"
"regexp"

manager "github.com/DataDog/ebpf-manager"
"github.com/NVIDIA/go-nvml/pkg/nvml"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/rlimit"

sysconfig "github.com/DataDog/datadog-agent/cmd/system-probe/config"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/gpu/model"
ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf"
Expand All @@ -28,12 +30,43 @@ import (
)

const (
cudaEventMap = "cuda_events"
cudaAllocCacheMap = "cuda_alloc_cache"
gpuAttacherName = "gpu"
gpuAttacherName = "gpu"

// consumerChannelSize controls the size of the go channel that buffers ringbuffer
// events (*ddebpf.RingBufferHandler).
// This value must be multiplied by the single event size and the result will represent the heap memory pre-allocated in Go runtime
// TODO: probably we need to reduce this value (see pkg/network/protocols/events/configuration.go for reference)
consumerChannelSize = 4096
)

var (
// defaultRingBufferSize controls the amount of memory in bytes used for buffering perf event data
defaultRingBufferSize = os.Getpagesize()

// using a global var to avoid propagation between Probe ctor and event consumer startup
eventHandler ddebpf.EventHandler
)

// bpfMapName stores the name of the BPF maps storing statistics and other info
type bpfMapName = string

const (
cudaEventsMap bpfMapName = "cuda_events"
cudaAllocCacheMap bpfMapName = "cuda_alloc_cache"
cudaSyncCacheMap bpfMapName = "cuda_sync_cache"
)

const consumerChannelSize = 4096
// probeFuncName stores the ebpf hook function name
type probeFuncName = string
val06 marked this conversation as resolved.
Show resolved Hide resolved

const (
cudaLaunchKernelProbe probeFuncName = "uprobe__cudaLaunchKernel"
cudaMallocProbe probeFuncName = "uprobe__cudaMalloc"
cudaMallocRetProbe probeFuncName = "uretprobe__cudaMalloc"
cudaStreamSyncProbe probeFuncName = "uprobe__cudaStreamSynchronize"
cudaStreamSyncRetProbe probeFuncName = "uretprobe__cudaStreamSynchronize"
cudaFreeProbe probeFuncName = "uprobe__cudaFree"
)

// ProbeDependencies holds the dependencies for the probe
type ProbeDependencies struct {
Expand All @@ -46,7 +79,7 @@ type ProbeDependencies struct {

// Probe represents the GPU monitoring probe
type Probe struct {
mgr *ddebpf.Manager
m *ddebpf.Manager
cfg *config.Config
consumer *cudaEventConsumer
attacher *uprobes.UprobeAttacher
Expand All @@ -56,134 +89,80 @@ type Probe struct {
procMon *monitor.ProcessMonitor
}

// NewProbe starts the GPU monitoring probe, setting up the eBPF program and the uprobes, the
// NewProbe creates a GPU monitoring probe, containing relevant eBPF programs (uprobes), the
val06 marked this conversation as resolved.
Show resolved Hide resolved
// consumers for the events generated from the uprobes, and the stats generator to aggregate the data from
// streams into per-process GPU stats.
func NewProbe(cfg *config.Config, deps ProbeDependencies) (*Probe, error) {
log.Debugf("starting GPU monitoring probe...")
if err := config.CheckGPUSupported(); err != nil {
var err error
var m *ddebpf.Manager
if err = config.CheckGPUSupported(); err != nil {
return nil, err
}

var probe *Probe
log.Tracef("creating GPU monitoring probe...")
filename := "gpu.o"
if cfg.BPFDebug {
filename = "gpu-debug.o"
}
err := ddebpf.LoadCOREAsset(filename, func(buf bytecode.AssetReader, opts manager.Options) error {
var err error
probe, err = startGPUProbe(buf, opts, deps, cfg)
if err != nil {
return fmt.Errorf("cannot start GPU monitoring probe: %s", err)
}
log.Debugf("started GPU monitoring probe")
return nil

err = ddebpf.LoadCOREAsset(filename, func(ar bytecode.AssetReader, o manager.Options) error {
m, err = getManager(ar, o)
return err
})
if err != nil {
return nil, fmt.Errorf("loading asset: %s", err)
}

return probe, nil
}

func startGPUProbe(buf bytecode.AssetReader, opts manager.Options, deps ProbeDependencies, cfg *config.Config) (*Probe, error) {
mgr := ddebpf.NewManagerWithDefault(&manager.Manager{
Maps: []*manager.Map{
{Name: cudaAllocCacheMap},
}})

if opts.MapSpecEditors == nil {
opts.MapSpecEditors = make(map[string]manager.MapSpecEditor)
}

// Ring buffer size has to be a multiple of the page size, and we want to have at least 4096 bytes
pagesize := os.Getpagesize()
ringbufSize := pagesize
minRingbufSize := 4096
if minRingbufSize > ringbufSize {
ringbufSize = (minRingbufSize/pagesize + 1) * pagesize
}

opts.MapSpecEditors[cudaEventMap] = manager.MapSpecEditor{
Type: ebpf.RingBuf,
MaxEntries: uint32(ringbufSize),
KeySize: 0,
ValueSize: 0,
EditorFlag: manager.EditType | manager.EditMaxEntries | manager.EditKeyValue,
}

attachCfg := uprobes.AttacherConfig{
Rules: []*uprobes.AttachRule{
{
LibraryNameRegex: regexp.MustCompile(`libcudart\.so`),
Targets: uprobes.AttachToExecutable | uprobes.AttachToSharedLibraries,
ProbesSelector: []manager.ProbesSelector{
&manager.AllOf{
Selectors: []manager.ProbesSelector{
&manager.ProbeSelector{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: "uprobe__cudaLaunchKernel"}},
&manager.ProbeSelector{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: "uprobe__cudaMalloc"}},
&manager.ProbeSelector{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: "uretprobe__cudaMalloc"}},
&manager.ProbeSelector{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: "uprobe__cudaStreamSynchronize"}},
&manager.ProbeSelector{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: "uretprobe__cudaStreamSynchronize"}},
&manager.ProbeSelector{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: "uprobe__cudaFree"}},
},
},
},
},
},
EbpfConfig: &cfg.Config,
PerformInitialScan: cfg.InitialProcessSync,
return nil, fmt.Errorf("error loading CO-RE %s: %w", sysconfig.GPUMonitoringModule, err)
}

attachCfg := getAttacherConfig(cfg)
// Note: this will later be replaced by a common way to enable the process monitor across system-probe
procMon := monitor.GetProcessMonitor()
if err := procMon.Initialize(false); err != nil {
return nil, fmt.Errorf("error initializing process monitor: %w", err)
}

attacher, err := uprobes.NewUprobeAttacher(gpuAttacherName, attachCfg, mgr, nil, &uprobes.NativeBinaryInspector{}, procMon)
attacher, err := uprobes.NewUprobeAttacher(gpuAttacherName, attachCfg, m, nil, &uprobes.NativeBinaryInspector{}, procMon)
if err != nil {
return nil, fmt.Errorf("error creating uprobes attacher: %w", err)
}

if err := rlimit.RemoveMemlock(); err != nil {
return nil, err
sysCtx, err := getSystemContext(deps.NvmlLib, cfg.ProcRoot)
if err != nil {
return nil, fmt.Errorf("error getting system context: %w", err)
}

p := &Probe{
mgr: mgr,
m: m,
cfg: cfg,
attacher: attacher,
deps: deps,
procMon: procMon,
sysCtx: sysCtx,
}

p.sysCtx, err = getSystemContext(deps.NvmlLib, cfg.Config.ProcRoot)
if err != nil {
return nil, fmt.Errorf("error getting system context: %w", err)
}
p.consumer = newCudaEventConsumer(sysCtx, eventHandler, p.cfg)
//TODO: decouple this to avoid sharing streamHandlers between consumer and statsGenerator
p.statsGenerator = newStatsGenerator(sysCtx, p.consumer.streamHandlers)

now, err := ddebpf.NowNanoseconds()
if err != nil {
return nil, fmt.Errorf("error getting current time: %w", err)
if err = p.start(); err != nil {
return nil, err
}
log.Tracef("GPU monitoring probe successfully started")
return p, nil
}

p.startEventConsumer()
p.statsGenerator = newStatsGenerator(p.sysCtx, now, p.consumer.streamHandlers)

if err := mgr.InitWithOptions(buf, &opts); err != nil {
return nil, fmt.Errorf("failed to init manager: %w", err)
}
// Start loads the ebpf programs using the ebpf manager and starts the process monitor and event consumer
func (p *Probe) start() error {
log.Tracef("starting GPU monitoring probe...")
p.consumer.Start()

if err := mgr.Start(); err != nil {
return nil, fmt.Errorf("failed to start manager: %w", err)
if err := p.m.Start(); err != nil {
return fmt.Errorf("failed to start manager: %w", err)
}

if err := attacher.Start(); err != nil {
return nil, fmt.Errorf("error starting uprobes attacher: %w", err)
if err := p.attacher.Start(); err != nil {
return fmt.Errorf("error starting uprobes attacher: %w", err)
}

return p, nil
return nil
}

// Close stops the probe
Expand All @@ -196,7 +175,7 @@ func (p *Probe) Close() {
p.attacher.Stop()
}

_ = p.mgr.Stop(manager.CleanAll)
_ = p.m.Stop(manager.CleanAll)

if p.consumer != nil {
p.consumer.Stop()
Expand All @@ -209,9 +188,7 @@ func (p *Probe) GetAndFlush() (*model.GPUStats, error) {
if err != nil {
return nil, fmt.Errorf("error getting current time: %w", err)
}

stats := p.statsGenerator.getStats(now)

p.cleanupFinished()

return stats, nil
Expand All @@ -222,16 +199,91 @@ func (p *Probe) cleanupFinished() {
p.consumer.cleanFinishedHandlers()
}

func (p *Probe) startEventConsumer() {
handler := ddebpf.NewRingBufferHandler(consumerChannelSize)
// toPowerOf2 converts a number to its nearest power of 2
func toPowerOf2(x int) int {
log := math.Log2(float64(x))
return int(math.Pow(2, math.Round(log)))
}

// setupSharedBuffer sets up the ringbuffer to handle CUDA events produces by ebpf uprobes
// it must be called BEFORE the InitWithOptions method of the manager is called
func setupSharedBuffer(m *manager.Manager, o *manager.Options) {
rbHandler := ddebpf.NewRingBufferHandler(consumerChannelSize)
rb := &manager.RingBuffer{
Map: manager.Map{Name: cudaEventMap},
Map: manager.Map{Name: cudaEventsMap},
RingBufferOptions: manager.RingBufferOptions{
RecordHandler: handler.RecordHandler,
RecordGetter: handler.RecordGetter,
RecordHandler: rbHandler.RecordHandler,
RecordGetter: rbHandler.RecordGetter,
},
}
p.mgr.RingBuffers = append(p.mgr.RingBuffers, rb)
p.consumer = newCudaEventConsumer(p.sysCtx, handler, p.cfg)
p.consumer.Start()

ringBufferSize := toPowerOf2(defaultRingBufferSize)

o.MapSpecEditors[cudaEventsMap] = manager.MapSpecEditor{
Type: ebpf.RingBuf,
MaxEntries: uint32(ringBufferSize),
KeySize: 0,
ValueSize: 0,
EditorFlag: manager.EditType | manager.EditMaxEntries | manager.EditKeyValue,
}

m.RingBuffers = append(m.RingBuffers, rb)
eventHandler = rbHandler
}

func getAttacherConfig(cfg *config.Config) uprobes.AttacherConfig {
return uprobes.AttacherConfig{
Rules: []*uprobes.AttachRule{
{
LibraryNameRegex: regexp.MustCompile(`libcudart\.so`),
Targets: uprobes.AttachToExecutable | uprobes.AttachToSharedLibraries,
ProbesSelector: []manager.ProbesSelector{
&manager.AllOf{
Selectors: []manager.ProbesSelector{
&manager.ProbeSelector{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: cudaLaunchKernelProbe}},
&manager.ProbeSelector{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: cudaMallocProbe}},
&manager.ProbeSelector{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: cudaMallocRetProbe}},
&manager.ProbeSelector{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: cudaStreamSyncProbe}},
&manager.ProbeSelector{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: cudaStreamSyncRetProbe}},
&manager.ProbeSelector{ProbeIdentificationPair: manager.ProbeIdentificationPair{EBPFFuncName: cudaFreeProbe}},
},
},
},
},
},
EbpfConfig: &cfg.Config,
PerformInitialScan: cfg.InitialProcessSync,
}
}

func getManager(buf io.ReaderAt, opts manager.Options) (*ddebpf.Manager, error) {
val06 marked this conversation as resolved.
Show resolved Hide resolved
m := ddebpf.NewManagerWithDefault(&manager.Manager{
/* We don't init the probes list here, because the manager will try to attach them at startup
and fail since those are uprobes and their full path is resolved in runtime using the uprobeAttacher
the uprobeAttacher will add those probe later via manager.AddHook API

All manager's modifiers will still run as they operate on the ProgramSpecs map
of the manager,which is populated while parsing the elf file and creating the CollectionSpec
*/

Maps: []*manager.Map{
{
Name: cudaAllocCacheMap,
},
{
Name: cudaSyncCacheMap,
},
}})

if opts.MapSpecEditors == nil {
opts.MapSpecEditors = make(map[string]manager.MapSpecEditor)
}

setupSharedBuffer(m.Manager, &opts)

if err := m.InitWithOptions(buf, &opts); err != nil {
return nil, fmt.Errorf("failed to init manager: %w", err)
}

return m, nil
}
Loading
Loading