Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] main from DataDog:main #66

Merged
merged 7 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitlab/e2e/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ new-e2e-aml:
- deploy_deb_testing-a7_x64
- deploy_windows_testing-a7
- qa_agent
- qa_agent_jmx
- qa_dca
rules:
- !reference [.on_aml_or_e2e_changes]
Expand Down
3 changes: 3 additions & 0 deletions .run/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# IntelliJ Goland out of the box configuration

This folder contains scripts and tasks for IntelliJ Goland to build and run the agent and its sub-processes
3 changes: 2 additions & 1 deletion comp/core/tagger/impl-dual/dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func NewComponent(req Requires) (Provides, error) {

return Provides{
local.Provides{
Comp: provide.Comp,
Comp: provide.Comp,
Endpoint: provide.Endpoint,
},
}, nil
}
Expand Down
23 changes: 20 additions & 3 deletions comp/core/tagger/impl-remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ package remotetaggerimpl
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/http"
"time"

"github.com/cenkalti/backoff"
Expand All @@ -21,6 +23,7 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"

api "github.com/DataDog/datadog-agent/comp/api/api/def"
"github.com/DataDog/datadog-agent/comp/core/config"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
taggercommon "github.com/DataDog/datadog-agent/comp/core/tagger/common"
Expand All @@ -35,6 +38,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/tagset"
"github.com/DataDog/datadog-agent/pkg/util/common"
grpcutil "github.com/DataDog/datadog-agent/pkg/util/grpc"
httputils "github.com/DataDog/datadog-agent/pkg/util/http"
)

const (
Expand All @@ -59,7 +63,8 @@ type Requires struct {
type Provides struct {
compdef.Out

Comp tagger.Component
Comp tagger.Component
Endpoint api.AgentEndpointProvider
}

type remoteTagger struct {
Expand Down Expand Up @@ -112,11 +117,12 @@ func NewComponent(req Requires) (Provides, error) {
}})

return Provides{
Comp: remoteTagger,
Comp: remoteTagger,
Endpoint: api.NewAgentEndpointProvider(remoteTagger.writeList, "/tagger-list", "GET"),
}, nil
}

func newRemoteTagger(params tagger.RemoteParams, cfg config.Component, log log.Component, telemetryComp coretelemetry.Component) (tagger.Component, error) {
func newRemoteTagger(params tagger.RemoteParams, cfg config.Component, log log.Component, telemetryComp coretelemetry.Component) (*remoteTagger, error) {
telemetryStore := telemetry.NewStore(telemetryComp)

target, err := params.RemoteTarget(cfg)
Expand Down Expand Up @@ -494,6 +500,17 @@ func (t *remoteTagger) startTaggerStream(maxElapsed time.Duration) error {
}, expBackoff)
}

func (t *remoteTagger) writeList(w http.ResponseWriter, _ *http.Request) {
response := t.List()

jsonTags, err := json.Marshal(response)
if err != nil {
httputils.SetJSONError(w, t.log.Errorf("Unable to marshal tagger list response: %s", err), 500)
return
}
w.Write(jsonTags)
}

func convertEventType(t pb.EventType) (types.EventType, error) {
switch t {
case pb.EventType_ADDED:
Expand Down
44 changes: 44 additions & 0 deletions comp/core/tagger/impl-remote/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@ package remotetaggerimpl

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"runtime"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/comp/core/config"
logmock "github.com/DataDog/datadog-agent/comp/core/log/mock"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
nooptelemetry "github.com/DataDog/datadog-agent/comp/core/telemetry/noopsimpl"
compdef "github.com/DataDog/datadog-agent/comp/def"
configmock "github.com/DataDog/datadog-agent/pkg/config/mock"
"github.com/DataDog/datadog-agent/pkg/util/grpc"
)
Expand Down Expand Up @@ -72,3 +77,42 @@ func TestStartDoNotBlockIfServerIsNotAvailable(t *testing.T) {
require.NoError(t, err)
remoteTagger.Stop()
}

func TestNewComponentSetsTaggerListEndpoint(t *testing.T) {
req := Requires{
Lc: compdef.NewTestLifecycle(t),
Config: configmock.New(t),
Log: logmock.New(t),
Params: tagger.RemoteParams{
RemoteTarget: func(config.Component) (string, error) { return ":5001", nil },
RemoteTokenFetcher: func(config.Component) func() (string, error) {
return func() (string, error) {
return "something", nil
}
},
},
Telemetry: nooptelemetry.GetCompatComponent(),
}
provides, err := NewComponent(req)
require.NoError(t, err)

endpointProvider := provides.Endpoint.Provider

assert.Equal(t, []string{"GET"}, endpointProvider.Methods())
assert.Equal(t, "/tagger-list", endpointProvider.Route())

// Create a test server with the endpoint handler
server := httptest.NewServer(endpointProvider.HandlerFunc())
defer server.Close()

// Make a request to the endpoint
resp, err := http.Get(server.URL + "/tagger-list")
require.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)

var response types.TaggerListResponse
err = json.NewDecoder(resp.Body).Decode(&response)
require.NoError(t, err)
assert.NotNil(t, response.Entities)
}
53 changes: 17 additions & 36 deletions pkg/gpu/probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,15 @@ import (
"testing"
"time"

"golang.org/x/exp/maps"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/maps"

"github.com/DataDog/datadog-agent/pkg/collector/corechecks/gpu/model"
"github.com/DataDog/datadog-agent/pkg/ebpf/ebpftest"
consumerstestutil "github.com/DataDog/datadog-agent/pkg/eventmonitor/consumers/testutil"
"github.com/DataDog/datadog-agent/pkg/gpu/config"
"github.com/DataDog/datadog-agent/pkg/gpu/testutil"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/util/testutil/flake"
)

type probeTestSuite struct {
Expand Down Expand Up @@ -72,9 +69,8 @@ func (s *probeTestSuite) TestCanReceiveEvents() {
t := s.T()

probe := s.getProbe()
cmd := testutil.RunSample(t, testutil.CudaSample)

utils.WaitForProgramsToBeTraced(t, gpuModuleName, gpuAttacherName, cmd.Process.Pid, utils.ManualTracingFallbackDisabled)
cmd, err := testutil.RunSample(t, testutil.CudaSample)
require.NoError(t, err)

var handlerStream, handlerGlobal *StreamHandler
require.Eventually(t, func() bool {
Expand All @@ -89,7 +85,7 @@ func (s *probeTestSuite) TestCanReceiveEvents() {
}

return handlerStream != nil && handlerGlobal != nil && len(handlerStream.kernelSpans) > 0 && len(handlerGlobal.allocations) > 0
}, 10*time.Second, 500*time.Millisecond, "stream and global handlers not found: existing is %v", probe.consumer.streamHandlers)
}, 3*time.Second, 100*time.Millisecond, "stream and global handlers not found: existing is %v", probe.consumer.streamHandlers)

// Check device assignments
require.Contains(t, probe.consumer.sysCtx.selectedDeviceByPIDAndTID, cmd.Process.Pid)
Expand All @@ -115,15 +111,14 @@ func (s *probeTestSuite) TestCanGenerateStats() {

probe := s.getProbe()

cmd := testutil.RunSample(t, testutil.CudaSample)

utils.WaitForProgramsToBeTraced(t, gpuModuleName, gpuAttacherName, cmd.Process.Pid, utils.ManualTracingFallbackDisabled)
cmd, err := testutil.RunSample(t, testutil.CudaSample)
require.NoError(t, err)

// Wait until the process finishes and we can get the stats. Run this instead of waiting for the process to finish
// so that we can time out correctly
//TODO: change this check to count telemetry counter of the consumer (once added).
// we are expecting 2 different streamhandlers because cudasample generates 3 events in total for 2 different streams (stream 0 and stream 30)
require.Eventually(t, func() bool {
return !utils.IsProgramTraced(gpuModuleName, gpuAttacherName, cmd.Process.Pid)
}, 20*time.Second, 500*time.Millisecond, "process not stopped")
return len(probe.consumer.streamHandlers) == 2
}, 3*time.Second, 100*time.Millisecond, "stream handlers count mismatch: expected: 2, got: %d", len(probe.consumer.streamHandlers))

stats, err := probe.GetAndFlush()
require.NoError(t, err)
Expand All @@ -145,21 +140,20 @@ func (s *probeTestSuite) TestMultiGPUSupport() {

sampleArgs := testutil.SampleArgs{
StartWaitTimeSec: 6, // default wait time for WaitForProgramsToBeTraced is 5 seconds, give margin to attach manually to avoid flakes
EndWaitTimeSec: 1, // We need the process to stay active a bit so we can inspect its environment variables, if it ends too quickly we get no information
CudaVisibleDevicesEnv: "1,2",
SelectedDevice: 1,
}
// Visible devices 1,2 -> selects 1 in that array -> global device index = 2
selectedGPU := testutil.GPUUUIDs[2]

cmd := testutil.RunSampleWithArgs(t, testutil.CudaSample, sampleArgs)
utils.WaitForProgramsToBeTraced(t, gpuModuleName, gpuAttacherName, cmd.Process.Pid, utils.ManualTracingFallbackEnabled)
cmd, err := testutil.RunSampleWithArgs(t, testutil.CudaSample, sampleArgs)
require.NoError(t, err)

// Wait until the process finishes and we can get the stats. Run this instead of waiting for the process to finish
// so that we can time out correctly
//TODO: change this check to count telemetry counter of the consumer (once added).
// we are expecting 2 different streamhandlers because cudasample generates 3 events in total for 2 different streams (stream 0 and stream 30)
require.Eventually(t, func() bool {
return !utils.IsProgramTraced(gpuModuleName, gpuAttacherName, cmd.Process.Pid)
}, 60*time.Second, 500*time.Millisecond, "process not stopped")
return len(probe.consumer.streamHandlers) == 2
}, 3*time.Second, 100*time.Millisecond, "stream handlers count mismatch: expected: 2, got: %d", len(probe.consumer.streamHandlers))

stats, err := probe.GetAndFlush()
require.NoError(t, err)
Expand All @@ -175,22 +169,9 @@ func (s *probeTestSuite) TestMultiGPUSupport() {
func (s *probeTestSuite) TestDetectsContainer() {
t := s.T()

// Flaky test in CI, avoid failures on main for now.
flake.Mark(t)

probe := s.getProbe()

args := testutil.GetDefaultArgs()
args.EndWaitTimeSec = 1
pid, cid := testutil.RunSampleInDockerWithArgs(t, testutil.CudaSample, testutil.MinimalDockerImage, args)

utils.WaitForProgramsToBeTraced(t, gpuModuleName, gpuAttacherName, pid, utils.ManualTracingFallbackDisabled)

// Wait until the process finishes and we can get the stats. Run this instead of waiting for the process to finish
// so that we can time out correctly
require.Eventually(t, func() bool {
return !utils.IsProgramTraced(gpuModuleName, gpuAttacherName, pid)
}, 20*time.Second, 500*time.Millisecond, "process not stopped")
pid, cid := testutil.RunSampleInDocker(t, testutil.CudaSample, testutil.MinimalDockerImage)

// Check that the stream handlers have the correct container ID assigned
for key, handler := range probe.consumer.streamHandlers {
Expand Down
19 changes: 10 additions & 9 deletions pkg/gpu/testdata/cudasample.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ cudaError_t cudaSetDevice(int device) {
int main(int argc, char **argv) {
cudaStream_t stream = 30;

if (argc != 4) {
fprintf(stderr, "Usage: %s <wait-to-start-sec> <wait-to-end-sec> <device-index>\n", argv[0]);
if (argc != 3) {
fprintf(stderr, "Usage: %s <wait-to-start-sec> <device-index>\n", argv[0]);
return 1;
}

int waitStart = atoi(argv[1]);
int waitEnd = atoi(argv[2]);
int device = atoi(argv[3]);
int device = atoi(argv[2]);

// This string is used by PatternScanner to validate a proper start of this sample program inside the container
fprintf(stderr, "Starting CudaSample program\n");
fprintf(stderr, "Waiting for %d seconds before starting\n", waitStart);

// Give time for the eBPF program to load
Expand All @@ -62,12 +63,12 @@ int main(int argc, char **argv) {
cudaFree(ptr);
cudaStreamSynchronize(stream);

fprintf(stderr, "CUDA calls made. Waiting for %d seconds before exiting\n", waitEnd);
// we don't exit to avoid flakiness when the process is terminated before it was hooked for gpu monitoring
// the expected usage is to send a kill signal to the process (or stop the container that is running it)

// Give time for the agent to inspect this process and check environment variables/etc before this exits
sleep(waitEnd);

fprintf(stderr, "Exiting\n");
//this line is used as a market by patternScanner to indicate the end of the program
fprintf(stderr, "CUDA calls made.\n");
pause(); // Wait for signal to finish the process

return 0;
}
Loading
Loading