Skip to content

Commit

Permalink
Add E2E tests for OTLPReceiver's ReceiveResourceSpansV2
Browse files Browse the repository at this point in the history
  • Loading branch information
IbraheemA committed Nov 4, 2024
1 parent 8be82eb commit b933e87
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package otelagent

import (
"github.com/DataDog/datadog-agent/test/new-e2e/pkg/e2e"
"github.com/DataDog/datadog-agent/test/new-e2e/pkg/environments"
localkubernetes "github.com/DataDog/datadog-agent/test/new-e2e/pkg/environments/local/kubernetes"
"github.com/DataDog/datadog-agent/test/new-e2e/tests/otel/utils"
"github.com/DataDog/test-infra-definitions/components/datadog/kubernetesagentparams"
"testing"
)

type otlpIngestTestSuiteWithSpanReceiverV2 struct {
e2e.BaseSuite[environments.Kubernetes]
}

func TestOTLPIngestWithSpanReceiverV2(t *testing.T) {
values := `
datadog:
logs:
containerCollectAll: false
containerCollectUsingFiles: false
agents:
containers:
otelAgent:
env:
- name: DD_APM_FEATURES
value: 'enable_receive_resource_spans_v2'
`
t.Parallel()
e2e.Run(t, &otlpIngestTestSuiteWithSpanReceiverV2{}, e2e.WithProvisioner(localkubernetes.Provisioner(localkubernetes.WithAgentOptions(kubernetesagentparams.WithoutDualShipping(), kubernetesagentparams.WithHelmValues(values), kubernetesagentparams.WithOTelAgent(), kubernetesagentparams.WithOTelConfig(minimalConfig)))))
}

var otlpIngestParams = utils.IAParams{
InfraAttributes: false,
}

func (s *otlpIngestTestSuiteWithSpanReceiverV2) SetupSuite() {
s.BaseSuite.SetupSuite()
utils.TestSpanReceiverV2(s)
}

func (s *otlpIngestTestSuiteWithSpanReceiverV2) TestTracesWithSpanReceiverV2() {
utils.TestTracesWithSpanReceiverV2(s, otlpIngestParams)
}
155 changes: 155 additions & 0 deletions test/new-e2e/tests/otel/utils/pipelines_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,89 @@ func TestTraces(s OTelTestSuite, iaParams IAParams) {
}
}

// TestTracesWithSpanReceiverV2 tests that OTLP traces are received through OTel pipelines as expected with updated OTLP span receiver
func TestTracesWithSpanReceiverV2(s OTelTestSuite, iaParams IAParams) {
err := s.Env().FakeIntake.Client().FlushServerAndResetAggregators()
require.NoError(s.T(), err)

var traces []*aggregator.TracePayload
s.T().Log("Waiting for traces")
require.EventuallyWithT(s.T(), func(c *assert.CollectT) {
traces, err = s.Env().FakeIntake.Client().GetTraces()
if !assert.NoError(c, err) {
return
}
if !assert.NotEmpty(c, traces) {
return
}
trace := traces[0]
if !assert.NotEmpty(s.T(), trace.TracerPayloads) {
return
}
tp := trace.TracerPayloads[0]
if !assert.NotEmpty(s.T(), tp.Chunks) {
return
}
if !assert.NotEmpty(s.T(), tp.Chunks[0].Spans) {
return
}
assert.Equal(s.T(), telemetrygenService, tp.Chunks[0].Spans[0].Service)
if iaParams.InfraAttributes {
ctags, ok := getContainerTags(s.T(), tp)
assert.True(s.T(), ok)
assert.NotNil(s.T(), ctags["kube_ownerref_kind"])
}
}, 5*time.Minute, 10*time.Second)
require.NotEmpty(s.T(), traces)
s.T().Log("Got traces", s.T().Name(), traces)

// Verify tags on traces and spans
tp := traces[0].TracerPayloads[0]
assert.Equal(s.T(), env, tp.Env)
assert.Equal(s.T(), version, tp.AppVersion)
assert.Empty(s.T(), tp.ContainerID)
require.NotEmpty(s.T(), tp.Chunks)
require.NotEmpty(s.T(), tp.Chunks[0].Spans)
spans := tp.Chunks[0].Spans
ctags, ok := getContainerTags(s.T(), tp)

for _, sp := range spans {
assert.Equal(s.T(), telemetrygenService, sp.Service)
assert.Equal(s.T(), env, sp.Meta["env"])
assert.Equal(s.T(), version, sp.Meta["version"])
assert.Equal(s.T(), customAttributeValue, sp.Meta[customAttribute])
if sp.Meta["span.kind"] == "client" {
assert.Equal(s.T(), "lets_go", sp.Name)
assert.Equal(s.T(), "lets-go", sp.Resource)
assert.Equal(s.T(), "http", sp.Type)
assert.Zero(s.T(), sp.ParentID)
} else {
assert.Equal(s.T(), "server", sp.Meta["span.kind"])
assert.Equal(s.T(), "okey_dokey_0", sp.Name)
assert.Equal(s.T(), "okey-dokey-0", sp.Resource)
assert.Equal(s.T(), "web", sp.Type)
assert.IsType(s.T(), uint64(0), sp.ParentID)
assert.NotZero(s.T(), sp.ParentID)
}
assert.IsType(s.T(), uint64(0), sp.TraceID)
assert.NotZero(s.T(), sp.TraceID)
assert.IsType(s.T(), uint64(0), sp.SpanID)
assert.NotZero(s.T(), sp.SpanID)
assert.Equal(s.T(), "telemetrygen", sp.Meta["otel.library.name"])
assert.Equal(s.T(), sp.Meta["k8s.node.name"], tp.Hostname)
assert.True(s.T(), ok)
assert.Equal(s.T(), sp.Meta["k8s.container.name"], ctags["kube_container_name"])
assert.Equal(s.T(), sp.Meta["k8s.namespace.name"], ctags["kube_namespace"])
assert.Equal(s.T(), sp.Meta["k8s.pod.name"], ctags["pod_name"])

// Verify container tags from infraattributes processor
if iaParams.InfraAttributes {
maps.Copy(ctags, sp.Meta)
testInfraTags(s.T(), ctags, iaParams)
}
}
}

// TestMetrics tests that OTLP metrics are received through OTel pipelines as expected
func TestMetrics(s OTelTestSuite, iaParams IAParams) {
err := s.Env().FakeIntake.Client().FlushServerAndResetAggregators()
Expand Down Expand Up @@ -319,6 +402,78 @@ func TestPrometheusMetrics(s OTelTestSuite) {
s.T().Log("Got otelcol_datadog_trace_agent_trace_writer_spans", traceAgentMetrics)
}

// TestSpanReceiverV2 tests that APM stats are correct when using probabilistic sampling
func TestSpanReceiverV2(s OTelTestSuite) {
ctx := context.Background()
err := s.Env().FakeIntake.Client().FlushServerAndResetAggregators()
require.NoError(s.T(), err)
numTraces := 10

s.T().Log("Starting telemetrygen")
createTelemetrygenJobForSpanReceiverV2(ctx, s, "traces", []string{"--traces", fmt.Sprint(numTraces)})
}

func createTelemetrygenJobForSpanReceiverV2(ctx context.Context, s OTelTestSuite, telemetry string, options []string) {
var ttlSecondsAfterFinished int32 = 0 //nolint:revive // We want to see this is explicitly set to 0
var backOffLimit int32 = 4

otlpEndpoint := fmt.Sprintf("%v:4317", s.Env().Agent.LinuxNodeAgent.LabelSelectors["app"])
jobSpec := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("telemetrygen-job-%v-%v", telemetry, strings.ReplaceAll(strings.ToLower(s.T().Name()), "/", "-")),
Namespace: "datadog",
},
Spec: batchv1.JobSpec{
TTLSecondsAfterFinished: &ttlSecondsAfterFinished,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Env: []corev1.EnvVar{{
Name: "OTEL_SERVICE_NAME",
Value: telemetrygenService,
}, {
Name: "OTEL_K8S_POD_ID",
ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.uid"}},
}, {
Name: "OTEL_K8S_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}},
}, {
Name: "OTEL_K8S_NODE_NAME",
ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "spec.nodeName"}},
}, {
Name: "OTEL_K8S_POD_NAME",
ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}},
}},
Name: "telemetrygen-job",
Image: "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen:v0.107.0",
Command: append([]string{
"/telemetrygen", telemetry, "--otlp-endpoint", otlpEndpoint, "--otlp-insecure",
"--telemetry-attributes", fmt.Sprintf("%v=%v", customAttribute, customAttributeValue),
"--telemetry-attributes", fmt.Sprintf("%v=\"%v\"", "k8s.pod.uid", "did-it-work"),
"--otlp-attributes", "service.name=\"$(OTEL_SERVICE_NAME)\"",
"--otlp-attributes", "host.name=\"$(OTEL_K8S_NODE_NAME)\"",
"--otlp-attributes", fmt.Sprintf("deployment.environment=\"%v\"", env),
"--otlp-attributes", fmt.Sprintf("service.version=\"%v\"", version),
"--otlp-attributes", "k8s.namespace.name=\"$(OTEL_K8S_NAMESPACE)\"",
"--otlp-attributes", "k8s.node.name=\"$(OTEL_K8S_NODE_NAME)\"",
"--otlp-attributes", "k8s.pod.name=\"$(OTEL_K8S_POD_NAME)\"",
//"--otlp-attributes", "k8s.pod.uid=\"$(OTEL_K8S_POD_ID)\"",
"--otlp-attributes", "k8s.container.name=\"telemetrygen-job\"",
}, options...),
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
BackoffLimit: &backOffLimit,
},
}

_, err := s.Env().KubernetesCluster.Client().BatchV1().Jobs("datadog").Create(ctx, jobSpec, metav1.CreateOptions{})
require.NoError(s.T(), err, "Could not properly start job")
}

func createTelemetrygenJob(ctx context.Context, s OTelTestSuite, telemetry string, options []string) {
var ttlSecondsAfterFinished int32 = 0 //nolint:revive // We want to see this is explicitly set to 0
var backOffLimit int32 = 4
Expand Down

0 comments on commit b933e87

Please sign in to comment.