Skip to content

Commit

Permalink
Add support for trace spans to the infra attributes processor (#26114)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinooliva authored Jul 19, 2024
1 parent 9333b38 commit d699e3b
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (f *factory) createTracesProcessor(
cfg component.Config,
nextConsumer consumer.Traces,
) (processor.Traces, error) {
iap, err := newInfraAttributesSpanProcessor(set, cfg.(*Config))
iap, err := newInfraAttributesSpanProcessor(set, cfg.(*Config), f.tagger)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,67 @@ package infraattributesprocessor
import (
"context"

"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"

"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.uber.org/zap"
)

type infraAttributesSpanProcessor struct {
logger *zap.Logger
logger *zap.Logger
tagger tagger.Component
cardinality types.TagCardinality
}

func newInfraAttributesSpanProcessor(set processor.Settings, _ *Config) (*infraAttributesSpanProcessor, error) {
tesp := &infraAttributesSpanProcessor{
logger: set.Logger,
func newInfraAttributesSpanProcessor(set processor.Settings, cfg *Config, tagger tagger.Component) (*infraAttributesSpanProcessor, error) {
iasp := &infraAttributesSpanProcessor{
logger: set.Logger,
tagger: tagger,
cardinality: cfg.Cardinality,
}
set.Logger.Info("Span Infra Attributes Processor configured")
return tesp, nil
return iasp, nil
}

func (tesp *infraAttributesSpanProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) {
func (iasp *infraAttributesSpanProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) {
rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
resourceAttributes := rss.At(i).Resource().Attributes()
entityIDs := entityIDsFromAttributes(resourceAttributes)
tagMap := make(map[string]string)

// Get all unique tags from resource attributes and global tags
for _, entityID := range entityIDs {
entityTags, err := iasp.tagger.Tag(entityID, iasp.cardinality)
if err != nil {
iasp.logger.Error("Cannot get tags for entity", zap.String("entityID", entityID), zap.Error(err))
continue
}
for _, tag := range entityTags {
k, v := splitTag(tag)
_, hasTag := tagMap[k]
if k != "" && v != "" && !hasTag {
tagMap[k] = v
}
}
}
globalTags, err := iasp.tagger.GlobalTags(iasp.cardinality)
if err != nil {
iasp.logger.Error("Cannot get global tags", zap.Error(err))
}
for _, tag := range globalTags {
k, v := splitTag(tag)
_, hasTag := tagMap[k]
if k != "" && v != "" && !hasTag {
tagMap[k] = v
}
}
// Add all tags as resource attributes
for k, v := range tagMap {
resourceAttributes.PutStr(k, v)
}
}
return td, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,87 +9,152 @@ import (
"context"
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processortest"

"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl"
"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl/collectors"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
)

// All the data we need to test the Span
type testTrace struct {
spanName string
libraryName string
libraryVersion string
type traceNameTest struct {
name string
inTraces ptrace.Traces
outResourceAttributes []map[string]any
}

// All the data we need to define a test
type traceTest struct {
name string
inTraces ptrace.Traces
type traceWithResource struct {
traceNames []string
resourceAttributes map[string]any
}

var (
nameTraces = []testTrace{
{
spanName: "test!",
libraryName: "otel",
libraryVersion: "11",
},
inTraceNames = []string{
"full_name_match",
}

standardTraceTests = []traceTest{
standardTraceTests = []traceNameTest{
{
name: "one tag with global",
inTraces: testResourceTraces([]traceWithResource{{
traceNames: inTraceNames,
resourceAttributes: map[string]any{
"container.id": "test",
},
}}),
outResourceAttributes: []map[string]any{{
"global": "tag",
"container.id": "test",
"container": "id",
}},
},
{
name: "keepServiceName",
inTraces: generateTraces(nameTraces),
name: "two tags with global",
inTraces: testResourceTraces([]traceWithResource{{
traceNames: inTraceNames,
resourceAttributes: map[string]any{
"container.id": "test",
"k8s.namespace.name": "namespace",
"k8s.deployment.name": "deployment",
},
}}),
outResourceAttributes: []map[string]any{{
"global": "tag",
"container.id": "test",
"k8s.namespace.name": "namespace",
"k8s.deployment.name": "deployment",
"container": "id",
"deployment": "name",
}},
},
{
name: "two resource traces, two tags with global",
inTraces: testResourceTraces([]traceWithResource{
{
traceNames: inTraceNames,
resourceAttributes: map[string]any{
"container.id": "test",
},
},
{
traceNames: inTraceNames,
resourceAttributes: map[string]any{
"k8s.namespace.name": "namespace",
"k8s.deployment.name": "deployment",
},
}}),
outResourceAttributes: []map[string]any{
{
"global": "tag",
"container.id": "test",
"container": "id",
},
{
"global": "tag",
"k8s.namespace.name": "namespace",
"k8s.deployment.name": "deployment",
"deployment": "name",
},
},
},
}
)

func testResourceTraces(twrs []traceWithResource) ptrace.Traces {
td := ptrace.NewTraces()

for _, twr := range twrs {
rs := td.ResourceSpans().AppendEmpty()
//nolint:errcheck
rs.Resource().Attributes().FromRaw(twr.resourceAttributes)
ts := rs.ScopeSpans().AppendEmpty().Spans()
for _, name := range twr.traceNames {
ts.AppendEmpty().SetName(name)
}
}
return td
}

func TestInfraAttributesTraceProcessor(t *testing.T) {
for _, test := range standardTraceTests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
next := new(consumertest.TracesSink)
cfg := &Config{}
cfg := &Config{
Traces: TraceInfraAttributes{},
Cardinality: types.LowCardinality,
}
fakeTagger := taggerimpl.SetupFakeTagger(t)
defer fakeTagger.ResetTagger()
fakeTagger.SetTags("container_id://test", "test", []string{"container:id"}, nil, nil, nil)
fakeTagger.SetTags("deployment://namespace/deployment", "test", []string{"deployment:name"}, nil, nil, nil)
fakeTagger.SetTags(collectors.GlobalEntityID, "test", []string{"global:tag"}, nil, nil, nil)
factory := NewFactory(fakeTagger)
fmp, err := factory.CreateTracesProcessor(
ctx,
context.Background(),
processortest.NewNopSettings(),
cfg,
next,
)
require.NotNil(t, fmp)
require.NoError(t, err)
assert.NotNil(t, fmp)
assert.NoError(t, err)

caps := fmp.Capabilities()
require.True(t, caps.MutatesData)

require.NoError(t, fmp.Start(ctx, nil))
assert.True(t, caps.MutatesData)
ctx := context.Background()
assert.NoError(t, fmp.Start(ctx, nil))

cErr := fmp.ConsumeTraces(ctx, test.inTraces)
require.Nil(t, cErr)
cErr := fmp.ConsumeTraces(context.Background(), test.inTraces)
assert.Nil(t, cErr)
assert.NoError(t, fmp.Shutdown(ctx))

require.NoError(t, fmp.Shutdown(ctx))
assert.Len(t, next.AllTraces(), 1)
for i, out := range test.outResourceAttributes {
trs := next.AllTraces()[0].ResourceSpans().At(i)
assert.NotNil(t, trs)
assert.EqualValues(t, out, trs.Resource().Attributes().AsRaw())
}
})
}
}

func generateTraces(traces []testTrace) ptrace.Traces {
td := ptrace.NewTraces()

for _, trace := range traces {
rs := td.ResourceSpans().AppendEmpty()
//nolint:errcheck
ils := rs.ScopeSpans().AppendEmpty()
ils.Scope().SetName(trace.libraryName)
ils.Scope().SetVersion(trace.libraryVersion)
span := ils.Spans().AppendEmpty()
//nolint:errcheck
span.SetName(trace.spanName)
}
return td
}

0 comments on commit d699e3b

Please sign in to comment.