Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serverless] S3 Downstream Span Pointers #32060

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
8 changes: 6 additions & 2 deletions pkg/serverless/invocationlifecycle/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package invocationlifecycle

import (
"fmt"
"github.com/DataDog/datadog-agent/pkg/serverless/spanpointers"
"strings"
"time"

Expand Down Expand Up @@ -126,8 +127,11 @@ func (lp *LifecycleProcessor) initFromKinesisStreamEvent(event events.KinesisEve
}

func (lp *LifecycleProcessor) initFromS3Event(event events.S3Event) {
if !lp.DetectLambdaLibrary() && lp.InferredSpansEnabled {
lp.GetInferredSpan().EnrichInferredSpanWithS3Event(event)
if !lp.DetectLambdaLibrary() {
if lp.InferredSpansEnabled {
lp.GetInferredSpan().EnrichInferredSpanWithS3Event(event)
}
lp.requestHandler.spanPointers = spanpointers.GetSpanPointersFromS3Event(event)
}

lp.requestHandler.event = event
Expand Down
4 changes: 3 additions & 1 deletion pkg/serverless/invocationlifecycle/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
serverlessLog "github.com/DataDog/datadog-agent/pkg/serverless/logs"
serverlessMetrics "github.com/DataDog/datadog-agent/pkg/serverless/metrics"
"github.com/DataDog/datadog-agent/pkg/serverless/spanpointers"
"github.com/DataDog/datadog-agent/pkg/serverless/trace/inferredspan"
"github.com/DataDog/datadog-agent/pkg/serverless/trace/propagation"
"github.com/DataDog/datadog-agent/pkg/serverless/trigger"
Expand All @@ -42,14 +43,15 @@ type LifecycleProcessor struct {
}

// RequestHandler is the struct that stores information about the trace,
// inferred span, and tags about the current invocation
// inferred span, tags about the current invocation, and span pointers
// inferred spans may contain a secondary inferred span in certain cases like SNS from SQS
type RequestHandler struct {
executionInfo *ExecutionStartInfo
event interface{}
inferredSpans [2]*inferredspan.InferredSpan
triggerTags map[string]string
triggerMetrics map[string]float64
spanPointers []spanpointers.SpanPointer
}

// SetMetaTag sets a meta span tag. A meta tag is a tag whose value type is string.
Expand Down
31 changes: 25 additions & 6 deletions pkg/serverless/invocationlifecycle/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package invocationlifecycle

import (
"bytes"
"encoding/json"
"os"
"testing"
"time"
Expand Down Expand Up @@ -1026,12 +1027,30 @@ func TestTriggerTypesLifecycleEventForS3(t *testing.T) {
testProcessor.OnInvokeEnd(&InvocationEndDetails{
RequestID: "test-request-id",
})
assert.Equal(t, map[string]string{
"cold_start": "false",
"function_trigger.event_source_arn": "aws:s3:sample:event:source",
"request_id": "test-request-id",
"function_trigger.event_source": "s3",
}, testProcessor.GetTags())

tags := testProcessor.GetTags()
assert.Equal(t, "false", tags["cold_start"])
assert.Equal(t, "aws:s3:sample:event:source", tags["function_trigger.event_source_arn"])
assert.Equal(t, "test-request-id", tags["request_id"])
assert.Equal(t, "s3", tags["function_trigger.event_source"])

var actualSpanLinks []map[string]interface{}
err := json.Unmarshal([]byte(tags["_dd.span_links"]), &actualSpanLinks)
assert.NoError(t, err)

expectedSpanLinks := []map[string]interface{}{
{
"attributes": map[string]interface{}{
"link.kind": "span-pointer",
"ptr.dir": "u",
"ptr.hash": "1dc3e5d00dae48c1f07d95371a747788",
"ptr.kind": "aws.s3.object",
},
"span_id": "0",
"trace_id": "0",
},
}
assert.Equal(t, expectedSpanLinks, actualSpanLinks)
}

func TestTriggerTypesLifecycleEventForSNS(t *testing.T) {
Expand Down
27 changes: 26 additions & 1 deletion pkg/serverless/invocationlifecycle/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
)

const (
functionNameEnvVar = "AWS_LAMBDA_FUNCTION_NAME"
functionNameEnvVar = "AWS_LAMBDA_FUNCTION_NAME"
spanPointerLinkKind = "span-pointer"
spanPointerUpDirection = "u" // Tracers will handle cases where direction is down
)

var /* const */ runtimeRegex = regexp.MustCompile(`^(dotnet|go|java|ruby)(\d+(\.\d+)*|\d+(\.x))$`)
Expand Down Expand Up @@ -162,6 +164,29 @@ func (lp *LifecycleProcessor) endExecutionSpan(endDetails *InvocationEndDetails)
}
}

if len(lp.requestHandler.spanPointers) > 0 {
var spanLinks []map[string]interface{}
for _, sp := range lp.requestHandler.spanPointers {
spanLink := map[string]interface{}{
"attributes": map[string]string{
"link.kind": spanPointerLinkKind,
"ptr.dir": spanPointerUpDirection,
"ptr.hash": sp.Hash,
"ptr.kind": sp.Kind,
},
"span_id": "0",
"trace_id": "0",
}
spanLinks = append(spanLinks, spanLink)
}
spanLinksJSON, err := json.Marshal(spanLinks)
if err != nil {
log.Debugf("Failed to marshal span links: %v\n", err)
} else {
executionSpan.Meta["_dd.span_links"] = string(spanLinksJSON)
}
}

return executionSpan
}

Expand Down
88 changes: 88 additions & 0 deletions pkg/serverless/invocationlifecycle/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package invocationlifecycle

import (
"encoding/json"
"github.com/DataDog/datadog-agent/pkg/serverless/spanpointers"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -809,6 +811,92 @@ func TestEndExecutionSpanWithStepFunctions(t *testing.T) {

}

func TestEndExecutionSpanWithSpanLinks(t *testing.T) {
tests := []struct {
name string
spanPointers []spanpointers.SpanPointer
expectedHashes []string
}{
{
name: "single span pointer",
spanPointers: []spanpointers.SpanPointer{
{
Hash: "abc",
Kind: "aws.s3.object",
},
},
expectedHashes: []string{"abc"},
},
{
name: "multiple span pointers",
spanPointers: []spanpointers.SpanPointer{
{
Hash: "def",
Kind: "aws.s3.object",
},
{
Hash: "ghi",
Kind: "aws.s3.object",
},
},
expectedHashes: []string{"def", "ghi"},
},
{
name: "no span pointers",
spanPointers: nil,
expectedHashes: []string{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
currentExecutionInfo := &ExecutionStartInfo{}
lp := &LifecycleProcessor{
requestHandler: &RequestHandler{
executionInfo: currentExecutionInfo,
spanPointers: tt.spanPointers,
triggerTags: make(map[string]string),
},
}

startTime := time.Now()
startDetails := &InvocationStartDetails{
StartTime: startTime,
InvokeEventHeaders: http.Header{},
}
lp.startExecutionSpan(nil, []byte("{}"), startDetails)

endDetails := &InvocationEndDetails{
EndTime: startTime.Add(1 * time.Second),
RequestID: "test-request-id",
ResponseRawPayload: []byte("{}"),
}

executionSpan := lp.endExecutionSpan(endDetails)

if len(tt.expectedHashes) == 0 {
assert.NotContains(t, executionSpan.Meta, "_dd.span_links")
return
}

var spanLinks []map[string]interface{}
err := json.Unmarshal([]byte(executionSpan.Meta["_dd.span_links"]), &spanLinks)
assert.NoError(t, err)
assert.Equal(t, len(tt.expectedHashes), len(spanLinks))
for i, spanLink := range spanLinks {
attributes, ok := spanLink["attributes"].(map[string]interface{})
assert.True(t, ok)
assert.Equal(t, tt.expectedHashes[i], attributes["ptr.hash"])
assert.Equal(t, "span-pointer", attributes["link.kind"])
assert.Equal(t, "u", attributes["ptr.dir"])
assert.Equal(t, "aws.s3.object", attributes["ptr.kind"])
assert.Equal(t, "0", spanLink["span_id"])
assert.Equal(t, "0", spanLink["trace_id"])
}
})
}
}

func TestParseLambdaPayload(t *testing.T) {
assert.Equal(t, []byte(""), ParseLambdaPayload([]byte("")))
assert.Equal(t, []byte("{}"), ParseLambdaPayload([]byte("{}")))
Expand Down
51 changes: 51 additions & 0 deletions pkg/serverless/spanpointers/span_pointers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

// Package spanpointers provides a helper functions for span pointers
package spanpointers

import (
"crypto/sha256"
"encoding/hex"
"github.com/DataDog/datadog-agent/pkg/serverless/trigger/events"
"strings"
)

const (
s3PointerKind = "aws.s3.object"
)

// SpanPointer is a struct that stores a hash and span kind to uniquely
// identify a S3 or DynamoDB operation.
type SpanPointer struct {
Hash string
Kind string
}

func generateSpanPointerHash(components []string) string {
dataToHash := strings.Join(components, "|")
sum := sha256.Sum256([]byte(dataToHash))
return hex.EncodeToString(sum[:])[:32]
}

// GetSpanPointersFromS3Event calculates span pointer attributes to uniquely identify
// S3 event records. These attributes will later be used to create the _dd.span_links JSON object.
func GetSpanPointersFromS3Event(event events.S3Event) []SpanPointer {
var pointers []SpanPointer
for _, record := range event.Records {
bucketName := record.S3.Bucket.Name
key := record.S3.Object.Key
eTag := strings.Trim(record.S3.Object.ETag, "\"")

hash := generateSpanPointerHash([]string{bucketName, key, eTag})

spanPointer := SpanPointer{
Hash: hash,
Kind: s3PointerKind,
}
pointers = append(pointers, spanPointer)
}
return pointers
}
Loading
Loading