diff --git a/README.md b/README.md index 5f3c1bd1..3922aa8a 100644 --- a/README.md +++ b/README.md @@ -91,9 +91,9 @@ devfiler spins up a local server that listens on `0.0.0.0:11000`. To run it, simply download and unpack the archive from the following URL: -https://upload.elastic.co/d/87e7697991940ec37f0c6e39ac38d213f65e8dc1ef9dbedff6aab9cba0adfaba +https://upload.elastic.co/d/f8aa0c386baa808a616ca29f86b34c726edb5af36f8840a4cf28468ad534a4b5 -Authentication token: `c74dfc4db2212015` +Authentication token: `2635c0750bf8ea69` The archive contains a build for each of the following platforms: diff --git a/go.mod b/go.mod index d94fc264..145446b0 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,12 @@ module go.opentelemetry.io/ebpf-profiler go 1.22.2 require ( + github.com/aws/aws-sdk-go-v2 v1.30.5 github.com/aws/aws-sdk-go-v2/config v1.27.35 - github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.21 github.com/aws/aws-sdk-go-v2/service/s3 v1.62.0 github.com/cespare/xxhash/v2 v2.3.0 github.com/cilium/ebpf v0.16.0 - github.com/elastic/go-freelru v0.15.0 + github.com/elastic/go-freelru v0.16.0 github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a github.com/google/uuid v1.6.0 github.com/jsimonetti/rtnetlink v1.4.2 @@ -29,7 +29,6 @@ require ( ) require ( - github.com/aws/aws-sdk-go-v2 v1.30.5 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.17.33 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 // indirect diff --git a/go.sum b/go.sum index 10c61a9e..7fe300bb 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,6 @@ github.com/aws/aws-sdk-go-v2/credentials v1.17.33 h1:lBHAQQznENv0gLHAZ73ONiTSkCt github.com/aws/aws-sdk-go-v2/credentials v1.17.33/go.mod h1:MBuqCUOT3ChfLuxNDGyra67eskx7ge9e3YKYBce7wpI= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 h1:pfQ2sqNpMVK6xz2RbqLEL0GH87JOwSxPV2rzm8Zsb74= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13/go.mod h1:NG7RXPUlqfsCLLFfi0+IpKN4sCB9D9fw/qTaSB+xRoU= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.21 h1:sV0doPPsRT7gMP0BnDPwSsysVTV/nKpB/nFmMnz8goE= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.21/go.mod h1:ictvfJWqE2gkUFDRJVp5VU/TrytuzK88DYcpan7UYuA= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 h1:pI7Bzt0BJtYA0N/JEC6B8fJ4RBrEMi1LBrkMdFYNSnQ= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17/go.mod h1:Dh5zzJYMtxfIjYW+/evjQ8uj2OyR/ve2KROHGHlSFqE= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 h1:Mqr/V5gvrhA2gvgnF42Zh5iMiQNcOYthFYwCyrnuWlc= @@ -43,10 +41,8 @@ github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEn github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/elastic/go-freelru v0.13.0 h1:TKKY6yCfNNNky7Pj9xZAOEpBcdNgZJfihEftOb55omg= -github.com/elastic/go-freelru v0.13.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= -github.com/elastic/go-freelru v0.15.0 h1:Jo1aY8JAvpyxbTDJEudrsBfjFDaALpfVv8mxuh9sfvI= -github.com/elastic/go-freelru v0.15.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= +github.com/elastic/go-freelru v0.16.0 h1:gG2HJ1WXN2tNl5/p40JS/l59HjvjRhjyAa+oFTRArYs= +github.com/elastic/go-freelru v0.16.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a h1:ymmtaN4bVCmKKeu4XEf6JEWNZKRXPMng1zjpKd+8rCU= github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a/go.mod h1:Nt+pnRYvf0POC+7pXsrv8ubsEOSsaipJP0zlz1Ms1RM= github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= diff --git a/host/host.go b/host/host.go index c12a02a3..4b6a9d44 100644 --- a/host/host.go +++ b/host/host.go @@ -55,4 +55,5 @@ type Trace struct { TID libpf.PID APMTraceID libpf.APMTraceID APMTransactionID libpf.APMTransactionID + CPU int } diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 12628306..17f796ef 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -56,8 +56,8 @@ func (c *Controller) Start(ctx context.Context) error { traceHandlerCacheSize := traceCacheSize(c.config.MonitorInterval, c.config.SamplesPerSecond, uint16(presentCores)) - intervals := times.New(c.config.MonitorInterval, - c.config.ReporterInterval, c.config.ProbabilisticInterval) + intervals := times.New(c.config.ReporterInterval, c.config.MonitorInterval, + c.config.ProbabilisticInterval) // Start periodic synchronization with the realtime clock times.StartRealtimeSync(ctx, c.config.ClockSyncInterval) diff --git a/libpf/cgroupv2.go b/libpf/cgroupv2.go new file mode 100644 index 00000000..aa5d952f --- /dev/null +++ b/libpf/cgroupv2.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libpf // import "go.opentelemetry.io/ebpf-profiler/libpf" + +import ( + "bufio" + "fmt" + "os" + "regexp" + + lru "github.com/elastic/go-freelru" + log "github.com/sirupsen/logrus" +) + +var ( + cgroupv2PathPattern = regexp.MustCompile(`0:.*?:(.*)`) +) + +// LookupCgroupv2 returns the cgroupv2 ID for pid. +func LookupCgroupv2(cgrouplru *lru.SyncedLRU[PID, string], pid PID) (string, error) { + id, ok := cgrouplru.Get(pid) + if ok { + return id, nil + } + + // Slow path + f, err := os.Open(fmt.Sprintf("/proc/%d/cgroup", pid)) + if err != nil { + return "", err + } + defer f.Close() + + var genericCgroupv2 string + scanner := bufio.NewScanner(f) + buf := make([]byte, 512) + // Providing a predefined buffer overrides the internal buffer that Scanner uses (4096 bytes). + // We can do that and also set a maximum allocation size on the following call. + // With a maximum of 4096 characters path in the kernel, 8192 should be fine here. We don't + // expect lines in /proc//cgroup to be longer than that. + scanner.Buffer(buf, 8192) + var pathParts []string + for scanner.Scan() { + line := scanner.Text() + pathParts = cgroupv2PathPattern.FindStringSubmatch(line) + if pathParts == nil { + log.Debugf("Could not extract cgroupv2 path from line: %s", line) + continue + } + genericCgroupv2 = pathParts[1] + break + } + + // Cache the cgroupv2 information. + // To avoid busy lookups, also empty cgroupv2 information is cached. + cgrouplru.Add(pid, genericCgroupv2) + + return genericCgroupv2, nil +} diff --git a/libpf/convenience.go b/libpf/convenience.go index 02c7cae1..1abdb604 100644 --- a/libpf/convenience.go +++ b/libpf/convenience.go @@ -5,9 +5,7 @@ package libpf // import "go.opentelemetry.io/ebpf-profiler/libpf" import ( "context" - "fmt" "math/rand/v2" - "os" "reflect" "time" "unsafe" @@ -15,24 +13,6 @@ import ( log "github.com/sirupsen/logrus" ) -// WriteTempFile writes a data buffer to a temporary file on the filesystem. It -// is the callers responsibility to clean up that file again. The function returns -// the filename if successful. -func WriteTempFile(data []byte, directory, prefix string) (string, error) { - file, err := os.CreateTemp(directory, prefix) - if err != nil { - return "", err - } - defer file.Close() - if _, err := file.Write(data); err != nil { - return "", fmt.Errorf("failed to write data to temporary file: %w", err) - } - if err := file.Sync(); err != nil { - return "", fmt.Errorf("failed to synchronize file data: %w", err) - } - return file.Name(), nil -} - // SleepWithJitter sleeps for baseDuration +/- jitter (jitter is [0..1]) func SleepWithJitter(baseDuration time.Duration, jitter float64) { time.Sleep(AddJitter(baseDuration, jitter)) diff --git a/libpf/libpf.go b/libpf/libpf.go index 653d9ef5..64e51d13 100644 --- a/libpf/libpf.go +++ b/libpf/libpf.go @@ -5,8 +5,6 @@ package libpf // import "go.opentelemetry.io/ebpf-profiler/libpf" import ( "encoding/json" - "fmt" - "math" "time" ) @@ -32,36 +30,9 @@ func NowAsUInt32() uint32 { return uint32(time.Now().Unix()) } -// UnixTime64 represents nanoseconds or (reduced precision) seconds since epoch. +// UnixTime64 represents nanoseconds since epoch. type UnixTime64 uint64 -func (t UnixTime64) MarshalJSON() ([]byte, error) { - if t > math.MaxUint32 { - // Nanoseconds, ES does not support 'epoch_nanoseconds' so - // we have to pass it a value formatted as 'strict_date_optional_time_nanos'. - out := []byte(fmt.Sprintf("%q", - time.Unix(0, int64(t)).UTC().Format(time.RFC3339Nano))) - return out, nil - } - - // Reduced precision seconds-since-the-epoch, ES 'epoch_second' formatter will match these. - out := []byte(fmt.Sprintf("%d", t)) - return out, nil -} - -// Unix returns the value as seconds since epoch. -func (t UnixTime64) Unix() int64 { - if t > math.MaxUint32 { - // Nanoseconds, convert to seconds-since-the-epoch - return time.Unix(0, int64(t)).Unix() - } - - return int64(t) -} - -// Compile-time interface checks -var _ json.Marshaler = (*UnixTime64)(nil) - // AddressOrLineno represents a line number in an interpreted file or an offset into // a native file. type AddressOrLineno uint64 diff --git a/libpf/libpf_test.go b/libpf/libpf_test.go index d2f0e938..38dfea1e 100644 --- a/libpf/libpf_test.go +++ b/libpf/libpf_test.go @@ -4,12 +4,9 @@ package libpf import ( - "fmt" - "strconv" "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestTraceType(t *testing.T) { @@ -45,35 +42,3 @@ func TestTraceType(t *testing.T) { assert.Equal(t, test.str, test.ty.String()) } } - -func TestUnixTime64_MarshalJSON(t *testing.T) { - tests := []struct { - name string - time UnixTime64 - want []byte - }{ - { - name: "zero", - time: UnixTime64(0), - want: []byte(strconv.Itoa(0)), - }, - { - name: "non-zero, seconds since the epoch", - time: UnixTime64(1710349106), - want: []byte(strconv.Itoa(1710349106)), - }, - { - name: "non-zero, nanoseconds since the epoch", - time: UnixTime64(1710349106864964685), - want: []byte(fmt.Sprintf("%q", "2024-03-13T16:58:26.864964685Z")), - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - b, err := test.time.MarshalJSON() - require.NoError(t, err) - assert.Equal(t, test.want, b) - }) - } -} diff --git a/libpf/pfelf/pfelf.go b/libpf/pfelf/pfelf.go index c3c66e08..a0dc8b67 100644 --- a/libpf/pfelf/pfelf.go +++ b/libpf/pfelf/pfelf.go @@ -20,24 +20,6 @@ import ( "go.opentelemetry.io/ebpf-profiler/libpf" ) -// SafeOpenELF opens the given ELF file in a safely way in that -// it recovers from panics inside elf.Open(). -// Under circumstances we see fatal errors from inside the runtime, which -// are not recoverable, e.g. "fatal error: runtime: out of memory". -func SafeOpenELF(name string) (elfFile *elf.File, err error) { - defer func() { - // debug/elf has issues with malformed ELF files - if r := recover(); r != nil { - if elfFile != nil { - elfFile.Close() - elfFile = nil - } - err = fmt.Errorf("failed to open ELF file (recovered from panic): %s", name) - } - }() - return elf.Open(name) -} - // HasDWARFData returns true if the provided ELF file contains actionable DWARF debugging // information. // This function does not call `elfFile.DWARF()` on purpose, as it can be extremely expensive in @@ -132,41 +114,6 @@ func GetDebugLink(elfFile *elf.File) (linkName string, crc32 int32, err error) { return ParseDebugLink(sectionData) } -// GetDebugAltLink returns the contents of the `.gnu_debugaltlink` section (path and build -// ID). If no link is present, ErrNoDebugLink is returned. -func GetDebugAltLink(elfFile *elf.File) (fileName, buildID string, err error) { - // The .gnu_debugaltlink section is not always present - sectionData, err := getSectionData(elfFile, ".gnu_debugaltlink") - if err != nil { - return "", "", ErrNoDebugLink - } - - // The whole .gnu_debugaltlink section consists of: - // 1) path to target (variable-length string) - // 2) null character separator (1 byte) - // 3) build ID (usually 20 bytes, but can vary) - // - // First, find the position of the null character: - nullCharIdx := bytes.IndexByte(sectionData, 0) - if nullCharIdx == -1 { - return "", "", nil - } - - // The path consists of all the characters before the first null character - path := strings.ToValidUTF8(string(sectionData[:nullCharIdx]), "") - - // Check that we can read a build ID: there should be at least 1 byte after the null character. - if nullCharIdx+1 == len(sectionData) { - return "", "", errors.New("malformed .gnu_debugaltlink section (missing build ID)") - } - - // The build ID consists of all the bytes after the first null character - buildIDBytes := sectionData[nullCharIdx+1:] - buildID = hex.EncodeToString(buildIDBytes) - - return path, buildID, nil -} - var ErrNoBuildID = errors.New("no build ID") var ubuntuKernelSignature = regexp.MustCompile(` \(Ubuntu[^)]*\)\n$`) @@ -303,32 +250,6 @@ func getNoteHexString(sectionBytes []byte, name string, noteType uint32) ( return hex.EncodeToString(sectionBytes[idxDataStart:idxDataEnd]), true, nil } -// GetLinuxBuildSalt extracts the linux kernel build salt from the provided ELF path. -// It is read from the .notes ELF section. -// It should be present in both kernel modules and the kernel image of most distro-vended kernel -// packages, and should be identical across all the files: kernel modules will have the same salt -// as their corresponding vmlinux image if they were built at the same time. -// This can be used to identify the kernel image corresponding to a module. -// See https://lkml.org/lkml/2018/7/3/1156 -func GetLinuxBuildSalt(filePath string) (salt string, found bool, err error) { - elfFile, err := SafeOpenELF(filePath) - if err != nil { - return "", false, fmt.Errorf("could not open %s: %w", filePath, err) - } - defer elfFile.Close() - - sectionData, err := getSectionData(elfFile, ".note.Linux") - if err != nil { - sectionData, err = getSectionData(elfFile, ".notes") - if err != nil { - return "", false, nil - } - } - - // 0x100 is defined as LINUX_ELFNOTE_BUILD_SALT in include/linux/build-salt.h - return getNoteHexString(sectionData, "Linux", 0x100) -} - func symbolMapFromELFSymbols(syms []elf.Symbol) *libpf.SymbolMap { symmap := &libpf.SymbolMap{} for _, sym := range syms { @@ -342,16 +263,6 @@ func symbolMapFromELFSymbols(syms []elf.Symbol) *libpf.SymbolMap { return symmap } -// GetSymbols gets the symbols of elf.File and returns them as libpf.SymbolMap for -// fast lookup by address and name. -func GetSymbols(elfFile *elf.File) (*libpf.SymbolMap, error) { - syms, err := elfFile.Symbols() - if err != nil { - return nil, err - } - return symbolMapFromELFSymbols(syms), nil -} - // GetDynamicSymbols gets the dynamic symbols of elf.File and returns them as libpf.SymbolMap for // fast lookup by address and name. func GetDynamicSymbols(elfFile *elf.File) (*libpf.SymbolMap, error) { @@ -362,43 +273,6 @@ func GetDynamicSymbols(elfFile *elf.File) (*libpf.SymbolMap, error) { return symbolMapFromELFSymbols(syms), nil } -// IsKernelModule returns true if the provided ELF file looks like a kernel module (an ELF with a -// .modinfo and .gnu.linkonce.this_module sections). -func IsKernelModule(file *elf.File) (bool, error) { - sectionFound, err := HasSection(file, ".modinfo") - if err != nil { - return false, err - } - - if !sectionFound { - return false, nil - } - - return HasSection(file, ".gnu.linkonce.this_module") -} - -// IsKernelImage returns true if the provided ELF file looks like a kernel image (an ELF with a -// __modver section). -func IsKernelImage(file *elf.File) (bool, error) { - return HasSection(file, "__modver") -} - -// IsKernelFile returns true if the provided ELF file looks like a kernel file (either a kernel -// image or a kernel module). -func IsKernelFile(file *elf.File) (bool, error) { - isModule, err := IsKernelImage(file) - if err != nil { - return false, err - } - - isImage, err := IsKernelModule(file) - if err != nil { - return false, err - } - - return isModule || isImage, nil -} - // IsGoBinary returns true if the provided file is a Go binary (= an ELF file with // a known Golang section). func IsGoBinary(file *elf.File) (bool, error) { diff --git a/libpf/readatbuf/readatbuf.go b/libpf/readatbuf/readatbuf.go index 0955e527..0596aaf7 100644 --- a/libpf/readatbuf/readatbuf.go +++ b/libpf/readatbuf/readatbuf.go @@ -111,6 +111,9 @@ func (reader *Reader) ReadAt(p []byte, off int64) (int, error) { if err != nil { return int(writeOffset), err } + if skipOffset > uint(len(data)) { + return 0, io.EOF + } copyLen := min(remaining, uint(len(data))-skipOffset) copy(p[writeOffset:][:copyLen], data[skipOffset:][:copyLen]) diff --git a/libpf/readatbuf/readatbuf_test.go b/libpf/readatbuf/readatbuf_test.go index b3d38e70..ac0d219f 100644 --- a/libpf/readatbuf/readatbuf_test.go +++ b/libpf/readatbuf/readatbuf_test.go @@ -5,6 +5,7 @@ package readatbuf_test import ( "bytes" + "io" "testing" "github.com/stretchr/testify/require" @@ -25,3 +26,18 @@ func TestCaching(t *testing.T) { testVariant(t, 1346, 11, 55) testVariant(t, 889, 34, 111) } + +func TestOutOfBoundsTail(t *testing.T) { + buf := bytes.NewReader([]byte{0, 1, 2, 3, 4, 5, 6, 7}) + r, err := readatbuf.New(buf, 5, 10) + require.NoError(t, err) + b := make([]byte, 1) + for i := int64(0); i < 32; i++ { + _, err = r.ReadAt(b, i) + if i > 7 { + require.ErrorIs(t, err, io.EOF) + } else { + require.NoError(t, err) + } + } +} diff --git a/libpf/tracehash.go b/libpf/tracehash.go index 71d1df22..be2bee3b 100644 --- a/libpf/tracehash.go +++ b/libpf/tracehash.go @@ -28,15 +28,6 @@ func TraceHashFromBytes(b []byte) (TraceHash, error) { return TraceHash{h}, nil } -// TraceHashFromString parses a byte slice of a trace hash into the internal data representation. -func TraceHashFromString(s string) (TraceHash, error) { - h, err := basehash.New128FromString(s) - if err != nil { - return TraceHash{}, err - } - return TraceHash{h}, nil -} - func (h TraceHash) Equal(other TraceHash) bool { return h.Hash128.Equal(other.Hash128) } diff --git a/main.go b/main.go index 7a5e0330..106413b8 100644 --- a/main.go +++ b/main.go @@ -124,7 +124,7 @@ func mainWithExitCode() exitCode { GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), ReportInterval: intervals.ReportInterval(), - ExecutablesCacheElements: 4096, + ExecutablesCacheElements: 16384, // Next step: Calculate FramesCacheElements from numCores and samplingRate. FramesCacheElements: 65536, CGroupCacheElements: 1024, diff --git a/metrics/metrics.go b/metrics/metrics.go index 53a74ff9..58cd83b2 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -34,8 +34,23 @@ var ( //go:embed metrics.json metricsJSON []byte + + // Used in fallback checks, e.g. to avoid sending "counters" with 0 values + metricTypes map[MetricID]MetricType ) +func init() { + defs, err := GetDefinitions() + if err != nil { + panic("extracting definitions from metrics.json") + } + + metricTypes = make(map[MetricID]MetricType, len(defs)) + for _, md := range defs { + metricTypes[md.ID] = md.Type + } +} + // reporterImpl allows swapping out the global metrics reporter. // // nil is a valid value indicating that metrics should be voided. @@ -104,6 +119,10 @@ func AddSlice(newMetrics []Metric) { continue } + if metric.Value == 0 && metricTypes[metric.ID] == MetricTypeCounter { + continue + } + idx := metric.ID / 64 mask := uint64(1) << (metric.ID % 64) // Metric IDs 1-7 correspond to CPU/IO/Agent metrics and are scheduled diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index 38e2e549..664d19b1 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -41,6 +41,7 @@ func TestMetrics(t *testing.T) { {IDIOThroughput, MetricValue(55)}, {IDIODuration, MetricValue(66)}, {IDAgentGoRoutines, MetricValue(20)}, + {IDUnwindCallInterpreter, MetricValue(0)}, } AddSlice(inputMetrics[0:2]) // 33, 55 @@ -49,6 +50,10 @@ func TestMetrics(t *testing.T) { AddSlice(inputMetrics[3:4]) // 20 Add(inputMetrics[0].ID, inputMetrics[0].Value) // 33, dropped AddSlice(inputMetrics[1:3]) // 55, 66 dropped + AddSlice(inputMetrics[2:5]) // 66 dropped, 20 dropped, 0 dropped + + // Drop counter with 0 value as we don't expect it to appear in output + inputMetrics = inputMetrics[:4] // trigger reporting time.Sleep(1 * time.Second) diff --git a/proc/proc.go b/proc/proc.go index eeadeca6..4de6a249 100644 --- a/proc/proc.go +++ b/proc/proc.go @@ -121,10 +121,15 @@ func GetKernelModules(modulesPath string, count++ - nFields, _ := fmt.Sscanf(line, "%s %d %d %s %s 0x%x", + nFields, err := fmt.Sscanf(line, "%s %d %d %s %s 0x%x", &name, &size, &refcount, &dependencies, &state, &address) + if err != nil { + log.Warnf("error parsing line '%s' in modules: '%s'", line, err) + continue + } if nFields < 6 { - return nil, fmt.Errorf("unexpected line in modules: '%s'", line) + log.Warnf("unexpected line in modules: '%s'", line) + continue } if address == 0 { continue diff --git a/processmanager/ebpf/ebpf.go b/processmanager/ebpf/ebpf.go index 9f9dc90b..bb07da91 100644 --- a/processmanager/ebpf/ebpf.go +++ b/processmanager/ebpf/ebpf.go @@ -135,6 +135,8 @@ var outerMapsName = [...]string{ "exe_id_to_19_stack_deltas", "exe_id_to_20_stack_deltas", "exe_id_to_21_stack_deltas", + "exe_id_to_22_stack_deltas", + "exe_id_to_23_stack_deltas", } // Compile time check to make sure ebpfMapsImpl satisfies the interface . diff --git a/processmanager/ebpf/ebpf_test.go b/processmanager/ebpf/ebpf_test.go index d80a78fd..fec5bf5d 100644 --- a/processmanager/ebpf/ebpf_test.go +++ b/processmanager/ebpf/ebpf_test.go @@ -1,6 +1,3 @@ -//go:build !integration -// +build !integration - // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 @@ -12,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/ebpf-profiler/support" ) func TestMapID(t *testing.T) { @@ -26,6 +24,7 @@ func TestMapID(t *testing.T) { 0x3FF: 10, // 1023 0x400: 11, // 1024 0xFFFFF: 20, // 1048575 (2^20 - 1) + (1 << support.StackDeltaBucketLargest) - 1: support.StackDeltaBucketLargest, } for numStackDeltas, expectedShift := range testCases { numStackDeltas := numStackDeltas @@ -33,11 +32,11 @@ func TestMapID(t *testing.T) { t.Run(fmt.Sprintf("deltas %d", numStackDeltas), func(t *testing.T) { shift, err := getMapID(numStackDeltas) require.NoError(t, err) - assert.Equal(t, expectedShift, shift, - fmt.Sprintf("wrong map name for %d deltas", numStackDeltas)) + assert.Equal(t, expectedShift, shift, "wrong map name for %d deltas", + numStackDeltas) }) } - _, err := getMapID(1 << 22) + _, err := getMapID(1 << (support.StackDeltaBucketLargest + 1)) require.Error(t, err) } diff --git a/reporter/fifo.go b/reporter/fifo.go deleted file mode 100644 index e6c871b7..00000000 --- a/reporter/fifo.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" - -import ( - "fmt" - "sync" - - log "github.com/sirupsen/logrus" -) - -// FifoRingBuffer implements a first-in-first-out ring buffer that is safe for concurrent access. -type FifoRingBuffer[T any] struct { //nolint:gocritic - sync.Mutex - - // data holds the actual data. - data []T - - // emptyT is variable of type T used for nullifying entries in data[]. - emptyT T - - // name holds a string to uniquely identify the ring buffer in log messages. - name string - - // size is the maximum number of entries in the ring buffer. - size uint32 - - // readPos holds the position of the first element to be read in the data array. - readPos uint32 - - // writePos holds the position where the next element should be - // placed in the data array. - writePos uint32 - - // count holds a count of how many entries are in the array. - count uint32 - - // overwriteCount holds a count of the number of overwritten entries since the last metric - // report interval. - overwriteCount uint32 -} - -func (q *FifoRingBuffer[T]) InitFifo(size uint32, name string) error { - if size == 0 { - return fmt.Errorf("unsupported size of fifo: %d", size) - } - q.Lock() - defer q.Unlock() - q.size = size - q.data = make([]T, size) - q.readPos = 0 - q.writePos = 0 - q.count = 0 - q.overwriteCount = 0 - q.name = name - return nil -} - -// zeroFifo re-initializes the ring buffer and clears the data array, making previously -// stored elements available for GC. -func (q *FifoRingBuffer[T]) zeroFifo() { - if err := q.InitFifo(q.size, q.name); err != nil { - // Should never happen - panic(err) - } -} - -// Append adds element v to the FifoRingBuffer. it overwrites existing elements if there is no -// space left. -func (q *FifoRingBuffer[T]) Append(v T) { - q.Lock() - defer q.Unlock() - - q.data[q.writePos] = v - q.writePos++ - - if q.writePos == q.size { - q.writePos = 0 - } - - if q.count < q.size { - q.count++ - if q.count == q.size { - log.Warnf("About to start overwriting elements in buffer for %s", - q.name) - } - } else { - q.overwriteCount++ - q.readPos = q.writePos - } -} - -// ReadAll returns all elements from the FifoRingBuffer. -func (q *FifoRingBuffer[T]) ReadAll() []T { - q.Lock() - defer q.Unlock() - - data := make([]T, q.count) - readPos := q.readPos - - for i := uint32(0); i < q.count; i++ { - pos := (i + readPos) % q.size - data[i] = q.data[pos] - // Allow for element to be GCed - q.data[pos] = q.emptyT - } - - q.readPos = q.writePos - q.count = 0 - - return data -} - -func (q *FifoRingBuffer[T]) GetOverwriteCount() uint32 { - q.Lock() - defer q.Unlock() - - count := q.overwriteCount - q.overwriteCount = 0 - return count -} diff --git a/reporter/fifo_test.go b/reporter/fifo_test.go deleted file mode 100644 index 7a4c7a80..00000000 --- a/reporter/fifo_test.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package reporter - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestFifo(t *testing.T) { - var integers []int - integers = append(integers, 1, 2, 3, 4, 5) - - var integersShared []int - integersShared = append(integersShared, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12) - - var retIntegers []int - retIntegers = append(retIntegers, 3, 4, 5) - - var retIntegersShared []int - retIntegersShared = append(retIntegersShared, 8, 9, 10, 11, 12) - - sharedFifo := &FifoRingBuffer[int]{} - err := sharedFifo.InitFifo(5, t.Name()) - require.NoError(t, err) - - //nolint:lll - tests := map[string]struct { - // size defines the size of the fifo. - size uint32 - // data will be written to and extracted from the fifo. - data []int - // returned reflects the data that is expected from the fifo - // after writing to it. - returned []int - // the number of overwrites that occurred - overwriteCount uint32 - // err indicates if an error is expected for this testcase. - err bool - // sharedFifo indicates if a shared fifo should be used. - // If false, a new fifo is used, specific to the testcase. - sharedFifo bool - // parallel indicates if parallelism should be enabled for this testcase. - parallel bool - }{ - // This testcase simulates a fifo with an invalid size of 0. - "Invalid size": {size: 0, err: true, parallel: true}, - // This testcase simulates a case where the numbers of elements - // written to the fifo represents the size of the fifo. - "Full Fifo": {size: 5, data: integers, returned: integers, overwriteCount: 0, parallel: true}, - // This testcase simulates a case where the number of elements - // written to the fifo exceed the size of the fifo. - "Fifo overflow": {size: 3, data: integers, returned: retIntegers, overwriteCount: 2, parallel: true}, - // This testcase simulates a case where only a few elements are - // written to the fifo and don't exceed the size of the fifo. - "Partial full": {size: 15, data: integers, returned: integers, overwriteCount: 0, parallel: true}, - - // The following test cases share the same fifo - - // This testcase simulates a case where the numbers of elements - // written to the fifo represents the size of the fifo. - "Shared Full Fifo": {data: integers, returned: integers, overwriteCount: 0, sharedFifo: true}, - // This testcase simulates a case where the number of elements - // written to the fifo exceed the size of the fifo. - "Shared Fifo overflow": {data: integersShared, returned: retIntegersShared, overwriteCount: 7, sharedFifo: true}, - } - - for name, testcase := range tests { - name := name - testcase := testcase - var fifo *FifoRingBuffer[int] - - t.Run(name, func(t *testing.T) { - if testcase.parallel { - t.Parallel() - } - - if testcase.sharedFifo { - fifo = sharedFifo - } else { - fifo = &FifoRingBuffer[int]{} - err := fifo.InitFifo(testcase.size, t.Name()) - if testcase.err { - require.Error(t, err) - return - } - require.NoError(t, err) - } - - empty := fifo.ReadAll() - require.Empty(t, empty) - - for _, v := range testcase.data { - fifo.Append(v) - } - - data := fifo.ReadAll() - for i := uint32(0); i < fifo.size; i++ { - assert.Equalf(t, 0, fifo.data[i], "fifo not empty after ReadAll(), idx: %d", i) - } - assert.Equal(t, testcase.returned, data) - assert.Equal(t, testcase.overwriteCount, fifo.GetOverwriteCount(), "overwrite count") - assert.Zero(t, fifo.GetOverwriteCount(), "overwrite count not reset") - }) - } -} - -func TestFifo_isWritableWhenZeroed(t *testing.T) { - fifo := &FifoRingBuffer[int]{} - require.NoError(t, fifo.InitFifo(1, t.Name())) - fifo.zeroFifo() - assert.NotPanics(t, func() { - fifo.Append(123) - }) -} diff --git a/reporter/iface.go b/reporter/iface.go index 85e4a1b2..7cf19cf3 100644 --- a/reporter/iface.go +++ b/reporter/iface.go @@ -36,6 +36,7 @@ type TraceEventMeta struct { Comm string APMServiceName string PID, TID libpf.PID + CPU int } type TraceReporter interface { diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index 35d6b056..c2fbdf29 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -4,14 +4,10 @@ package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" import ( - "bufio" "context" "crypto/rand" "crypto/tls" - "fmt" "maps" - "os" - "regexp" "slices" "strconv" "time" @@ -33,8 +29,8 @@ import ( "go.opentelemetry.io/ebpf-profiler/libpf/xsync" ) -var ( - cgroupv2PathPattern = regexp.MustCompile(`0:.*?:(.*)`) +const ( + executableCacheLifetime = 1 * time.Hour ) // Assert that we implement the full Reporter interface. @@ -70,6 +66,7 @@ type traceAndMetaKey struct { apmServiceName string // containerID is annotated based on PID information containerID string + pid int64 } // traceEvents holds known information about a trace. @@ -84,9 +81,11 @@ type traceEvents struct { } // attrKeyValue is a helper to populate Profile.attribute_table. -type attrKeyValue struct { - key string - value string +type attrKeyValue[T string | int64] struct { + key string + // Set to true for OTel SemConv attributes with requirement level: Required + required bool + value T } // OTLPReporter receives and transforms information to be OTLP/profiles compliant. @@ -101,8 +100,8 @@ type OTLPReporter struct { // client for the connection to the receiver. client otlpcollector.ProfilesServiceClient - // stopSignal is the stop signal for shutting down all background tasks. - stopSignal chan libpf.Void + // runLoop handles the run loop + runLoop *runLoop // rpcStats stores gRPC related statistics. rpcStats *StatsHandlerImpl @@ -152,7 +151,7 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) { if err != nil { return nil, err } - executables.SetLifetime(1 * time.Hour) // Allow GC to clean stale items. + executables.SetLifetime(executableCacheLifetime) // Allow GC to clean stale items. frames, err := lru.NewSynced[libpf.FileID, *xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]]( @@ -179,15 +178,17 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) { } return &OTLPReporter{ - config: cfg, - name: cfg.Name, - version: cfg.Version, - kernelVersion: cfg.KernelVersion, - hostName: cfg.HostName, - ipAddress: cfg.IPAddress, - samplesPerSecond: cfg.SamplesPerSecond, - hostID: strconv.FormatUint(cfg.HostID, 10), - stopSignal: make(chan libpf.Void), + config: cfg, + name: cfg.Name, + version: cfg.Version, + kernelVersion: cfg.KernelVersion, + hostName: cfg.HostName, + ipAddress: cfg.IPAddress, + samplesPerSecond: cfg.SamplesPerSecond, + hostID: strconv.FormatUint(cfg.HostID, 10), + runLoop: &runLoop{ + stopSignal: make(chan libpf.Void), + }, pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout, client: nil, rpcStats: NewStatsHandler(), @@ -213,7 +214,7 @@ func (r *OTLPReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta traceEventsMap := r.traceEvents.WLock() defer r.traceEvents.WUnlock(&traceEventsMap) - containerID, err := r.lookupCgroupv2(meta.PID) + containerID, err := libpf.LookupCgroupv2(r.cgroupv2ID, meta.PID) if err != nil { log.Debugf("Failed to get a cgroupv2 ID as container ID for PID %d: %v", meta.PID, err) @@ -224,6 +225,7 @@ func (r *OTLPReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta comm: meta.Comm, apmServiceName: meta.APMServiceName, containerID: containerID, + pid: int64(meta.PID), } if events, exists := (*traceEventsMap)[key]; exists { @@ -344,7 +346,7 @@ func (r *OTLPReporter) ReportMetrics(_ uint32, _ []uint32, _ []int64) {} // Stop triggers a graceful shutdown of OTLPReporter. func (r *OTLPReporter) Stop() { - close(r.stopSignal) + r.runLoop.Stop() } // GetMetrics returns internal metrics of OTLPReporter. @@ -368,41 +370,27 @@ func (r *OTLPReporter) Start(ctx context.Context) error { otlpGrpcConn, err := waitGrpcEndpoint(ctx, r.config, r.rpcStats) if err != nil { cancelReporting() - close(r.stopSignal) + r.runLoop.Stop() return err } r.client = otlpcollector.NewProfilesServiceClient(otlpGrpcConn) - go func() { - tick := time.NewTicker(r.config.ReportInterval) - defer tick.Stop() - purgeTick := time.NewTicker(5 * time.Minute) - defer purgeTick.Stop() - for { - select { - case <-ctx.Done(): - return - case <-r.stopSignal: - return - case <-tick.C: - if err := r.reportOTLPProfile(ctx); err != nil { - log.Errorf("Request failed: %v", err) - } - tick.Reset(libpf.AddJitter(r.config.ReportInterval, 0.2)) - case <-purgeTick.C: - // Allow the GC to purge expired entries to avoid memory leaks. - r.executables.PurgeExpired() - r.frames.PurgeExpired() - r.cgroupv2ID.PurgeExpired() - } + r.runLoop.Start(ctx, r.config.ReportInterval, func() { + if err := r.reportOTLPProfile(ctx); err != nil { + log.Errorf("Request failed: %v", err) } - }() + }, func() { + // Allow the GC to purge expired entries to avoid memory leaks. + r.executables.PurgeExpired() + r.frames.PurgeExpired() + r.cgroupv2ID.PurgeExpired() + }) // When Stop() is called and a signal to 'stop' is received, then: // - cancel the reporting functions currently running (using context) // - close the gRPC connection with collection-agent go func() { - <-r.stopSignal + <-r.runLoop.stopSignal cancelReporting() if err := otlpGrpcConn.Close(); err != nil { log.Fatalf("Stopping connection of OTLP client client failed: %v", err) @@ -566,7 +554,7 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u // Walk every frame of the trace. for i := range traceInfo.frameTypes { - frameAttributes := addProfileAttributes(profile, []attrKeyValue{ + frameAttributes := addProfileAttributes(profile, []attrKeyValue[string]{ {key: "profile.frame.type", value: traceInfo.frameTypes[i].String()}, }, attributeMap) @@ -590,7 +578,9 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u fileIDtoMapping[traceInfo.files[i]] = idx locationMappingIndex = idx - execInfo, exists := r.executables.Get(traceInfo.files[i]) + // Ensure that actively used executables do not expire. + execInfo, exists := r.executables.GetAndRefresh(traceInfo.files[i], + executableCacheLifetime) // Next step: Select a proper default value, // if the name of the executable is not known yet. @@ -599,12 +589,12 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u fileName = execInfo.fileName } - mappingAttributes := addProfileAttributes(profile, []attrKeyValue{ + mappingAttributes := addProfileAttributes(profile, []attrKeyValue[string]{ // Once SemConv and its Go package is released with the new // semantic convention for build_id, replace these hard coded // strings. {key: "process.executable.build_id.gnu", value: execInfo.gnuBuildID}, - {key: "process.executable.build_id.profiling", + {key: "process.executable.build_id.htlhash", value: traceInfo.files[i].StringNoQuotes()}, }, attributeMap) @@ -658,17 +648,19 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u loc.Line = append(loc.Line, line) // To be compliant with the protocol, generate a dummy mapping entry. - loc.MappingIndex = getDummyMappingIndex(fileIDtoMapping, stringMap, - profile, traceInfo.files[i]) + loc.MappingIndex = getDummyMappingIndex(fileIDtoMapping, + stringMap, attributeMap, profile, traceInfo.files[i]) } profile.Location = append(profile.Location, loc) } - sample.Attributes = addProfileAttributes(profile, []attrKeyValue{ + sample.Attributes = append(addProfileAttributes(profile, []attrKeyValue[string]{ {key: string(semconv.ContainerIDKey), value: traceKey.containerID}, {key: string(semconv.ThreadNameKey), value: traceKey.comm}, {key: string(semconv.ServiceNameKey), value: traceKey.apmServiceName}, - }, attributeMap) + }, attributeMap), addProfileAttributes(profile, []attrKeyValue[int64]{ + {key: string(semconv.ProcessPIDKey), value: traceKey.pid}, + }, attributeMap)...) sample.LocationsLength = uint64(len(traceInfo.frameTypes)) locationIndex += sample.LocationsLength @@ -739,15 +731,29 @@ func createFunctionEntry(funcMap map[funcInfo]uint64, // addProfileAttributes adds attributes to Profile.attribute_table and returns // the indices to these attributes. -func addProfileAttributes(profile *profiles.Profile, - attributes []attrKeyValue, attributeMap map[string]uint64) []uint64 { +func addProfileAttributes[T string | int64](profile *profiles.Profile, + attributes []attrKeyValue[T], attributeMap map[string]uint64) []uint64 { indices := make([]uint64, 0, len(attributes)) - addAttr := func(attr attrKeyValue) { - if attr.value == "" { + addAttr := func(attr attrKeyValue[T]) { + var attributeCompositeKey string + var attributeValue common.AnyValue + + switch val := any(attr.value).(type) { + case string: + if !attr.required && val == "" { + return + } + attributeCompositeKey = attr.key + "_" + val + attributeValue = common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: val}} + case int64: + attributeCompositeKey = attr.key + "_" + strconv.Itoa(int(val)) + attributeValue = common.AnyValue{Value: &common.AnyValue_IntValue{IntValue: val}} + default: + log.Error("Unsupported attribute value type. Only string and int64 are supported.") return } - attributeCompositeKey := attr.key + "_" + attr.value + if attributeIndex, exists := attributeMap[attributeCompositeKey]; exists { indices = append(indices, attributeIndex) return @@ -756,7 +762,7 @@ func addProfileAttributes(profile *profiles.Profile, indices = append(indices, newIndex) profile.AttributeTable = append(profile.AttributeTable, &common.KeyValue{ Key: attr.key, - Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: attr.value}}, + Value: &attributeValue, }) attributeMap[attributeCompositeKey] = newIndex } @@ -770,24 +776,23 @@ func addProfileAttributes(profile *profiles.Profile, // getDummyMappingIndex inserts or looks up an entry for interpreted FileIDs. func getDummyMappingIndex(fileIDtoMapping map[libpf.FileID]uint64, - stringMap map[string]uint32, profile *profiles.Profile, - fileID libpf.FileID) uint64 { - var locationMappingIndex uint64 + stringMap map[string]uint32, attributeMap map[string]uint64, + profile *profiles.Profile, fileID libpf.FileID) uint64 { if tmpMappingIndex, exists := fileIDtoMapping[fileID]; exists { - locationMappingIndex = tmpMappingIndex - } else { - idx := uint64(len(fileIDtoMapping)) - fileIDtoMapping[fileID] = idx - locationMappingIndex = idx - - profile.Mapping = append(profile.Mapping, &profiles.Mapping{ - Filename: int64(getStringMapIndex(stringMap, "")), - BuildId: int64(getStringMapIndex(stringMap, - fileID.StringNoQuotes())), - BuildIdKind: *profiles.BuildIdKind_BUILD_ID_BINARY_HASH.Enum(), - }) + return tmpMappingIndex } - return locationMappingIndex + idx := uint64(len(fileIDtoMapping)) + fileIDtoMapping[fileID] = idx + + mappingAttributes := addProfileAttributes(profile, []attrKeyValue[string]{ + {key: "process.executable.build_id.htlhash", + value: fileID.StringNoQuotes()}}, attributeMap) + + profile.Mapping = append(profile.Mapping, &profiles.Mapping{ + Filename: int64(getStringMapIndex(stringMap, "")), + Attributes: mappingAttributes, + }) + return idx } // waitGrpcEndpoint waits until the gRPC connection is established. @@ -852,44 +857,3 @@ func setupGrpcConnection(parent context.Context, cfg *Config, //nolint:staticcheck return grpc.DialContext(ctx, cfg.CollAgentAddr, opts...) } - -// lookupCgroupv2 returns the cgroupv2 ID for pid. -func (r *OTLPReporter) lookupCgroupv2(pid libpf.PID) (string, error) { - id, ok := r.cgroupv2ID.Get(pid) - if ok { - return id, nil - } - - // Slow path - f, err := os.Open(fmt.Sprintf("/proc/%d/cgroup", pid)) - if err != nil { - return "", err - } - defer f.Close() - - var genericCgroupv2 string - scanner := bufio.NewScanner(f) - buf := make([]byte, 512) - // Providing a predefined buffer overrides the internal buffer that Scanner uses (4096 bytes). - // We can do that and also set a maximum allocation size on the following call. - // With a maximum of 4096 characters path in the kernel, 8192 should be fine here. We don't - // expect lines in /proc//cgroup to be longer than that. - scanner.Buffer(buf, 8192) - var pathParts []string - for scanner.Scan() { - line := scanner.Text() - pathParts = cgroupv2PathPattern.FindStringSubmatch(line) - if pathParts == nil { - log.Debugf("Could not extract cgroupv2 path from line: %s", line) - continue - } - genericCgroupv2 = pathParts[1] - break - } - - // Cache the cgroupv2 information. - // To avoid busy lookups, also empty cgroupv2 information is cached. - r.cgroupv2ID.Add(pid, genericCgroupv2) - - return genericCgroupv2, nil -} diff --git a/reporter/otlp_reporter_test.go b/reporter/otlp_reporter_test.go index d9c2844f..42c64d04 100644 --- a/reporter/otlp_reporter_test.go +++ b/reporter/otlp_reporter_test.go @@ -26,11 +26,19 @@ func TestGetSampleAttributes(t *testing.T) { comm: "", apmServiceName: "", containerID: "", + pid: 0, + }, + }, + attributeMap: make(map[string]uint64), + expectedIndices: [][]uint64{{0}}, + expectedAttributeTable: []*common.KeyValue{ + { + Key: "process.pid", + Value: &common.AnyValue{ + Value: &common.AnyValue_IntValue{IntValue: 0}, + }, }, }, - attributeMap: make(map[string]uint64), - expectedIndices: [][]uint64{make([]uint64, 0, 4)}, - expectedAttributeTable: nil, }, "duplicate": { profile: &profiles.Profile{}, @@ -40,16 +48,18 @@ func TestGetSampleAttributes(t *testing.T) { comm: "comm1", apmServiceName: "apmServiceName1", containerID: "containerID1", + pid: 1234, }, { hash: libpf.TraceHash{}, comm: "comm1", apmServiceName: "apmServiceName1", containerID: "containerID1", + pid: 1234, }, }, attributeMap: make(map[string]uint64), - expectedIndices: [][]uint64{{0, 1, 2}, {0, 1, 2}}, + expectedIndices: [][]uint64{{0, 1, 2, 3}, {0, 1, 2, 3}}, expectedAttributeTable: []*common.KeyValue{ { Key: "container.id", @@ -69,6 +79,12 @@ func TestGetSampleAttributes(t *testing.T) { Value: &common.AnyValue_StringValue{StringValue: "apmServiceName1"}, }, }, + { + Key: "process.pid", + Value: &common.AnyValue{ + Value: &common.AnyValue_IntValue{IntValue: 1234}, + }, + }, }, }, "different": { @@ -79,16 +95,18 @@ func TestGetSampleAttributes(t *testing.T) { comm: "comm1", apmServiceName: "apmServiceName1", containerID: "containerID1", + pid: 1234, }, { hash: libpf.TraceHash{}, comm: "comm2", apmServiceName: "apmServiceName2", containerID: "containerID2", + pid: 6789, }, }, attributeMap: make(map[string]uint64), - expectedIndices: [][]uint64{{0, 1, 2}, {3, 4, 5}}, + expectedIndices: [][]uint64{{0, 1, 2, 3}, {4, 5, 6, 7}}, expectedAttributeTable: []*common.KeyValue{ { Key: "container.id", @@ -108,6 +126,12 @@ func TestGetSampleAttributes(t *testing.T) { Value: &common.AnyValue_StringValue{StringValue: "apmServiceName1"}, }, }, + { + Key: "process.pid", + Value: &common.AnyValue{ + Value: &common.AnyValue_IntValue{IntValue: 1234}, + }, + }, { Key: "container.id", Value: &common.AnyValue{ @@ -126,6 +150,12 @@ func TestGetSampleAttributes(t *testing.T) { Value: &common.AnyValue_StringValue{StringValue: "apmServiceName2"}, }, }, + { + Key: "process.pid", + Value: &common.AnyValue{ + Value: &common.AnyValue_IntValue{IntValue: 6789}, + }, + }, }, }, } @@ -136,11 +166,16 @@ func TestGetSampleAttributes(t *testing.T) { t.Run(name, func(t *testing.T) { indices := make([][]uint64, 0) for _, k := range tc.k { - indices = append(indices, addProfileAttributes(tc.profile, []attrKeyValue{ - {key: string(semconv.ContainerIDKey), value: k.containerID}, - {key: string(semconv.ThreadNameKey), value: k.comm}, - {key: string(semconv.ServiceNameKey), value: k.apmServiceName}, - }, tc.attributeMap)) + indices = append(indices, append(addProfileAttributes(tc.profile, + []attrKeyValue[string]{ + {key: string(semconv.ContainerIDKey), value: k.containerID}, + {key: string(semconv.ThreadNameKey), value: k.comm}, + {key: string(semconv.ServiceNameKey), value: k.apmServiceName}, + }, tc.attributeMap), + addProfileAttributes(tc.profile, + []attrKeyValue[int64]{ + {key: string(semconv.ProcessPIDKey), value: k.pid}, + }, tc.attributeMap)...)) } require.Equal(t, tc.expectedIndices, indices) require.Equal(t, tc.expectedAttributeTable, tc.profile.AttributeTable) diff --git a/reporter/runloop.go b/reporter/runloop.go new file mode 100644 index 00000000..0014d634 --- /dev/null +++ b/reporter/runloop.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" + +import ( + "context" + "time" + + "go.opentelemetry.io/ebpf-profiler/libpf" +) + +// runLoop implements the run loop for all reporters +type runLoop struct { + // stopSignal is the stop signal for shutting down all background tasks. + stopSignal chan libpf.Void +} + +func (rl *runLoop) Start(ctx context.Context, reportInterval time.Duration, run, purge func()) { + go func() { + tick := time.NewTicker(reportInterval) + defer tick.Stop() + purgeTick := time.NewTicker(5 * time.Minute) + defer purgeTick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-rl.stopSignal: + return + case <-tick.C: + run() + tick.Reset(libpf.AddJitter(reportInterval, 0.2)) + case <-purgeTick.C: + purge() + } + } + }() +} + +func (rl *runLoop) Stop() { + close(rl.stopSignal) +} diff --git a/stringutil/stringutil.go b/stringutil/stringutil.go index 3d8b4a88..06a19988 100644 --- a/stringutil/stringutil.go +++ b/stringutil/stringutil.go @@ -84,8 +84,3 @@ func SplitN(s, sep string, f []string) int { func ByteSlice2String(b []byte) string { return *(*string)(unsafe.Pointer(&b)) } - -// StrDataPtr returns the string's underlying Data pointer through reflection. -func StrDataPtr(s string) *byte { - return unsafe.StringData(s) -} diff --git a/support/ebpf/dotnet_tracer.ebpf.c b/support/ebpf/dotnet_tracer.ebpf.c index c36dd294..7ebac864 100644 --- a/support/ebpf/dotnet_tracer.ebpf.c +++ b/support/ebpf/dotnet_tracer.ebpf.c @@ -170,7 +170,7 @@ ErrorCode unwind_one_dotnet_frame(PerCPURecord *record, DotnetProcInfo *vi, bool u64 code_start = state->text_section_bias; u64 code_header_ptr = pc; - state->return_address = true; + unwinder_mark_nonleaf_frame(state); if (type < 0x100 && (type & DOTNET_CODE_FLAG_LEAF)) { // Stub frame that does not do calls. diff --git a/support/ebpf/extmaps.h b/support/ebpf/extmaps.h index 76a83f57..5922d9ed 100644 --- a/support/ebpf/extmaps.h +++ b/support/ebpf/extmaps.h @@ -38,6 +38,8 @@ extern bpf_map_def exe_id_to_18_stack_deltas; extern bpf_map_def exe_id_to_19_stack_deltas; extern bpf_map_def exe_id_to_20_stack_deltas; extern bpf_map_def exe_id_to_21_stack_deltas; +extern bpf_map_def exe_id_to_22_stack_deltas; +extern bpf_map_def exe_id_to_23_stack_deltas; extern bpf_map_def hotspot_procs; extern bpf_map_def kernel_stackmap; extern bpf_map_def dotnet_procs; diff --git a/support/ebpf/hotspot_tracer.ebpf.c b/support/ebpf/hotspot_tracer.ebpf.c index c63f14d6..9ae1fdc7 100644 --- a/support/ebpf/hotspot_tracer.ebpf.c +++ b/support/ebpf/hotspot_tracer.ebpf.c @@ -719,7 +719,7 @@ ErrorCode hotspot_execute_unwind_action(CodeBlobInfo *cbi, HotspotUnwindAction a return ERR_UNREACHABLE; #if defined(__aarch64__) case UA_UNWIND_AARCH64_LR: - if (state->return_address) { + if (state->lr_invalid) { increment_metric(metricID_UnwindHotspotErrLrUnwindingMidTrace); return ERR_HOTSPOT_LR_UNWINDING_MID_TRACE; } @@ -761,7 +761,7 @@ ErrorCode hotspot_execute_unwind_action(CodeBlobInfo *cbi, HotspotUnwindAction a state->pc = ui->pc; state->sp = ui->sp; state->fp = ui->fp; - state->return_address = true; + unwinder_mark_nonleaf_frame(state); increment_metric(metricID_UnwindHotspotFrames); } } diff --git a/support/ebpf/native_stack_trace.ebpf.c b/support/ebpf/native_stack_trace.ebpf.c index d9c56f3b..959099cb 100644 --- a/support/ebpf/native_stack_trace.ebpf.c +++ b/support/ebpf/native_stack_trace.ebpf.c @@ -37,6 +37,8 @@ STACK_DELTA_BUCKET(18); STACK_DELTA_BUCKET(19); STACK_DELTA_BUCKET(20); STACK_DELTA_BUCKET(21); +STACK_DELTA_BUCKET(22); +STACK_DELTA_BUCKET(23); // Unwind info value for invalid stack delta #define STACK_DELTA_INVALID (STACK_DELTA_COMMAND_FLAG | UNWIND_COMMAND_INVALID) @@ -154,6 +156,8 @@ void *get_stack_delta_map(int mapID) { case 19: return &exe_id_to_19_stack_deltas; case 20: return &exe_id_to_20_stack_deltas; case 21: return &exe_id_to_21_stack_deltas; + case 22: return &exe_id_to_22_stack_deltas; + case 23: return &exe_id_to_23_stack_deltas; default: return NULL; } } @@ -472,7 +476,7 @@ static ErrorCode unwind_one_frame(u64 pid, u32 frame_idx, UnwindState *state, bo return ERR_NATIVE_PC_READ; } state->sp = cfa; - state->return_address = true; + unwinder_mark_nonleaf_frame(state); frame_ok: increment_metric(metricID_UnwindNativeFrames); return ERR_OK; @@ -512,6 +516,7 @@ static ErrorCode unwind_one_frame(u64 pid, u32 frame_idx, struct UnwindState *st state->lr = normalize_pac_ptr(rt_regs[30]); state->r22 = rt_regs[22]; state->return_address = false; + state->lr_invalid = false; DEBUG_PRINT("signal frame"); goto frame_ok; case UNWIND_COMMAND_STOP: @@ -548,7 +553,7 @@ static ErrorCode unwind_one_frame(u64 pid, u32 frame_idx, struct UnwindState *st if (info->fpOpcode == UNWIND_OPCODE_BASE_LR) { // Allow LR unwinding only if it's known to be valid: either because // it's the topmost user-mode frame, or recovered by signal trampoline. - if (state->return_address) { + if (state->lr_invalid) { increment_metric(metricID_UnwindNativeErrLrUnwindingMidTrace); return ERR_NATIVE_LR_UNWINDING_MID_TRACE; } @@ -593,7 +598,7 @@ static ErrorCode unwind_one_frame(u64 pid, u32 frame_idx, struct UnwindState *st } state->sp = cfa; - state->return_address = true; + unwinder_mark_nonleaf_frame(state); frame_ok: increment_metric(metricID_UnwindNativeFrames); return ERR_OK; @@ -642,7 +647,12 @@ static inline ErrorCode copy_state_regs(UnwindState *state, // Treat syscalls as return addresses, but not IRQ handling, page faults, etc.. // https://github.com/torvalds/linux/blob/2ef5971ff3/arch/arm64/include/asm/ptrace.h#L118 // https://github.com/torvalds/linux/blob/2ef5971ff3/arch/arm64/include/asm/ptrace.h#L206-L209 + // + // Note: We do not use `unwinder_mark_nonleaf_frame` here, + // because the frame is a leaf frame from the perspective of the user stack, + // regardless of whether we are in a syscall. state->return_address = interrupted_kernelmode && regs->syscallno != -1; + state->lr_invalid = false; #endif return ERR_OK; diff --git a/support/ebpf/tracemgmt.h b/support/ebpf/tracemgmt.h index d2c58d53..c0e08a45 100644 --- a/support/ebpf/tracemgmt.h +++ b/support/ebpf/tracemgmt.h @@ -200,6 +200,7 @@ static inline PerCPURecord *get_pristine_per_cpu_record() #elif defined(__aarch64__) record->state.lr = 0; record->state.r22 = 0; + record->state.lr_invalid = false; #endif record->state.return_address = false; record->state.error_metric = -1; @@ -242,6 +243,25 @@ void unwinder_mark_done(PerCPURecord *record, int unwinder) { record->unwindersDone |= 1U << unwinder; } +// unwinder_mark_nonleaf_frame marks the current frame as a non-leaf +// frame from the perspective of the user-mode stack. +// That is, frames that are making a syscall (thus the leaf for the user-mode +// stack, though not the leaf for the entire logical stack) *are* +// considered leaf frames in this sense. +// +// On both x86 and aarch64, this means we need to subtract 1 from +// the address during later processing. +// +// Additionally, on aarch64, this means that we will not trust the current value of +// `lr` to be the return address for this frame. +static inline __attribute__((__always_inline__)) +void unwinder_mark_nonleaf_frame(UnwindState *state) { + state->return_address = true; +#if defined(__aarch64__) + state->lr_invalid = true; +#endif +} + // Push the file ID, line number and frame type into FrameList with a user-defined // maximum stack size. // diff --git a/support/ebpf/tracer.ebpf.release.amd64 b/support/ebpf/tracer.ebpf.release.amd64 index 9ff3eabb..2a8cbb2f 100644 Binary files a/support/ebpf/tracer.ebpf.release.amd64 and b/support/ebpf/tracer.ebpf.release.amd64 differ diff --git a/support/ebpf/tracer.ebpf.release.arm64 b/support/ebpf/tracer.ebpf.release.arm64 index dca6175e..ecac9895 100644 Binary files a/support/ebpf/tracer.ebpf.release.arm64 and b/support/ebpf/tracer.ebpf.release.arm64 differ diff --git a/support/ebpf/types.h b/support/ebpf/types.h index f161a284..e5592ff8 100644 --- a/support/ebpf/types.h +++ b/support/ebpf/types.h @@ -571,8 +571,19 @@ typedef struct UnwindState { // Set if the PC is a return address. That is, it points to the next instruction // after a CALL instruction, and requires to be adjusted during symbolization. - // On aarch64, this additionally means that LR register can not be used. + // + // Consider calling unwinder_mark_nonleaf_frame rather than setting this directly. bool return_address; + +#if defined(__aarch64__) + // On aarch64, whether to forbid LR-based unwinding. + // LR unwinding is only allowed for leaf user-mode frames. Frames making a syscall + // are also considered leaf frames for this purpose, because LR is preserved across + // syscalls. + // + // Consider calling unwinder_mark_nonleaf_frame rather than setting this directly. + bool lr_invalid; +#endif } UnwindState; // Container for unwinding state needed by the Perl unwinder. Keeping track of @@ -820,8 +831,8 @@ void decode_bias_and_unwind_program(u64 bias_and_unwind_program, u64* bias, int* // Smallest stack delta bucket that holds up to 2^8 entries #define STACK_DELTA_BUCKET_SMALLEST 8 -// Largest stack delta bucket that holds up to 2^21 entries -#define STACK_DELTA_BUCKET_LARGEST 21 +// Largest stack delta bucket that holds up to 2^23 entries +#define STACK_DELTA_BUCKET_LARGEST 23 // Struct of the `system_config` map. Contains various configuration variables // determined and set by the host agent. diff --git a/support/ebpf/v8_tracer.ebpf.c b/support/ebpf/v8_tracer.ebpf.c index 9928b51c..d6da2a71 100644 --- a/support/ebpf/v8_tracer.ebpf.c +++ b/support/ebpf/v8_tracer.ebpf.c @@ -271,7 +271,7 @@ ErrorCode unwind_one_v8_frame(PerCPURecord *record, V8ProcInfo *vi, bool top) { state->sp = fp + sizeof(regs); state->fp = regs[0]; state->pc = regs[1]; - state->return_address = true; + unwinder_mark_nonleaf_frame(state); DEBUG_PRINT("v8: pc: %lx, sp: %lx, fp: %lx", (unsigned long) state->pc, (unsigned long) state->sp, diff --git a/tools/coredump/ebpfhelpers.go b/tools/coredump/ebpfhelpers.go index 3f3ef07a..f4088917 100644 --- a/tools/coredump/ebpfhelpers.go +++ b/tools/coredump/ebpfhelpers.go @@ -114,7 +114,8 @@ func __bpf_map_lookup_elem(id C.u64, mapdef *C.bpf_map_def, keyptr unsafe.Pointe &C.exe_id_to_11_stack_deltas, &C.exe_id_to_12_stack_deltas, &C.exe_id_to_13_stack_deltas, &C.exe_id_to_14_stack_deltas, &C.exe_id_to_15_stack_deltas, &C.exe_id_to_16_stack_deltas, &C.exe_id_to_17_stack_deltas, &C.exe_id_to_18_stack_deltas, &C.exe_id_to_19_stack_deltas, - &C.exe_id_to_20_stack_deltas, &C.exe_id_to_21_stack_deltas: + &C.exe_id_to_20_stack_deltas, &C.exe_id_to_21_stack_deltas, &C.exe_id_to_22_stack_deltas, + &C.exe_id_to_23_stack_deltas: ctx.stackDeltaFileID = *(*C.u64)(keyptr) return unsafe.Pointer(stackDeltaInnerMap) case &C.unwind_info_array: diff --git a/tracehandler/tracehandler.go b/tracehandler/tracehandler.go index c1a6e391..ebc39ac0 100644 --- a/tracehandler/tracehandler.go +++ b/tracehandler/tracehandler.go @@ -126,6 +126,7 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) { PID: bpfTrace.PID, TID: bpfTrace.TID, APMServiceName: "", // filled in below + CPU: bpfTrace.CPU, } if !m.reporter.SupportsReportTraceEvent() { diff --git a/tracer/events.go b/tracer/events.go index 0a29a9c4..7e21fe56 100644 --- a/tracer/events.go +++ b/tracer/events.go @@ -134,7 +134,7 @@ func startPerfEventMonitor(ctx context.Context, perfEventMap *ebpf.Map, // calls. Returns a function that can be called to retrieve perf event array // error counts. func startPollingPerfEventMonitor(ctx context.Context, perfEventMap *ebpf.Map, - pollFrequency time.Duration, perCPUBufferSize int, triggerFunc func([]byte), + pollFrequency time.Duration, perCPUBufferSize int, triggerFunc func([]byte, int), ) func() (lost, noData, readError uint64) { eventReader, err := perf.NewReader(perfEventMap, perCPUBufferSize) if err != nil { @@ -178,7 +178,7 @@ func startPollingPerfEventMonitor(ctx context.Context, perfEventMap *ebpf.Map, noDataCount.Add(1) continue } - triggerFunc(data.RawSample) + triggerFunc(data.RawSample, data.CPU) } } }() diff --git a/tracer/tracer.go b/tracer/tracer.go index 2da8bff5..78067738 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -841,7 +841,7 @@ func (t *Tracer) eBPFMetricsCollector( // // If the raw trace contains a kernel stack ID, the kernel stack is also // retrieved and inserted at the appropriate position. -func (t *Tracer) loadBpfTrace(raw []byte) *host.Trace { +func (t *Tracer) loadBpfTrace(raw []byte, cpu int) *host.Trace { frameListOffs := int(unsafe.Offsetof(C.Trace{}.frames)) if len(raw) < frameListOffs { @@ -863,6 +863,7 @@ func (t *Tracer) loadBpfTrace(raw []byte) *host.Trace { PID: libpf.PID(ptr.pid), TID: libpf.PID(ptr.tid), KTime: times.KTime(ptr.ktime), + CPU: cpu, } // Trace fields included in the hash: @@ -912,8 +913,8 @@ func (t *Tracer) StartMapMonitors(ctx context.Context, traceOutChan chan *host.T eventMetricCollector := t.startEventMonitor(ctx) startPollingPerfEventMonitor(ctx, t.ebpfMaps["trace_events"], t.intervals.TracePollInterval(), - t.samplesPerSecond*int(unsafe.Sizeof(C.Trace{})), func(rawTrace []byte) { - traceOutChan <- t.loadBpfTrace(rawTrace) + t.samplesPerSecond*int(unsafe.Sizeof(C.Trace{})), func(rawTrace []byte, cpu int) { + traceOutChan <- t.loadBpfTrace(rawTrace, cpu) }) pidEvents := make([]uint32, 0)