Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition setting global tags in Azure Container Apps. #30743

Merged
merged 4 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions cmd/serverless-init/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ package main
import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -150,9 +152,31 @@ func setup(_ mode.Conf, tagger tagger.Component) (cloudservice.CloudService, *se
go flushMetricsAgent(metricAgent)
return cloudService, agentLogConfig, traceAgent, metricAgent, logsAgent
}

var azureContainerAppTags = []string{
"subscription_id",
"resource_group",
"resource_id",
"replicate_name",
"aca.subscription.id",
"aca.resource.group",
"aca.resource.id",
"aca.replica.name",
}

func setupTraceAgent(tags map[string]string, tagger tagger.Component) trace.ServerlessTraceAgent {
traceAgent := trace.StartServerlessTraceAgent(pkgconfigsetup.Datadog().GetBool("apm_config.enabled"), &trace.LoadConfig{Path: datadogConfigPath, Tagger: tagger}, nil, random.Random.Uint64())
traceAgent.SetTags(tags)
var azureTags strings.Builder
for _, azureContainerAppTag := range azureContainerAppTags {
if value, ok := tags[azureContainerAppTag]; ok {
azureTags.WriteString(fmt.Sprintf(",%s:%s", azureContainerAppTag, value))
}
}
traceAgent := trace.StartServerlessTraceAgent(trace.StartServerlessTraceAgentArgs{
Enabled: pkgconfigsetup.Datadog().GetBool("apm_config.enabled"),
LoadConfig: &trace.LoadConfig{Path: datadogConfigPath, Tagger: tagger},
ColdStartSpanID: random.Random.Uint64(),
AzureContainerAppTags: azureTags.String(),
})
go func() {
for range time.Tick(3 * time.Second) {
traceAgent.Flush()
Expand Down
7 changes: 6 additions & 1 deletion cmd/serverless/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,12 @@ func startOtlpAgent(wg *sync.WaitGroup, metricAgent *metrics.ServerlessMetricAge

func startTraceAgent(wg *sync.WaitGroup, lambdaSpanChan chan *pb.Span, coldStartSpanId uint64, serverlessDaemon *daemon.Daemon, tagger tagger.Component) {
defer wg.Done()
traceAgent := trace.StartServerlessTraceAgent(pkgconfigsetup.Datadog().GetBool("apm_config.enabled"), &trace.LoadConfig{Path: datadogConfigPath, Tagger: tagger}, lambdaSpanChan, coldStartSpanId)
traceAgent := trace.StartServerlessTraceAgent(trace.StartServerlessTraceAgentArgs{
Enabled: pkgconfigsetup.Datadog().GetBool("apm_config.enabled"),
LoadConfig: &trace.LoadConfig{Path: datadogConfigPath, Tagger: tagger},
LambdaSpanChan: lambdaSpanChan,
ColdStartSpanID: coldStartSpanId,
})
serverlessDaemon.SetTraceAgent(traceAgent)
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/serverless/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ func TestSetTraceTagOk(t *testing.T) {
}
t.Setenv("DD_API_KEY", "x")
t.Setenv("DD_RECEIVER_PORT", strconv.Itoa(testutil.FreeTCPPort(t)))
agent := trace.StartServerlessTraceAgent(true, &trace.LoadConfig{Path: "/does-not-exist.yml"}, make(chan *pb.Span), random.Random.Uint64())
agent := trace.StartServerlessTraceAgent(trace.StartServerlessTraceAgentArgs{
Enabled: true,
LoadConfig: &trace.LoadConfig{Path: "/does-not-exist.yml"},
LambdaSpanChan: make(chan *pb.Span),
ColdStartSpanID: random.Random.Uint64(),
})
defer agent.Stop()
d := Daemon{
TraceAgent: agent,
Expand Down
6 changes: 5 additions & 1 deletion pkg/serverless/daemon/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,11 @@ func BenchmarkStartEndInvocation(b *testing.B) {
func startAgents() *Daemon {
d := StartDaemon(fmt.Sprint("127.0.0.1:", testutil.FreeTCPPort(nil)))

ta := trace.StartServerlessTraceAgent(true, &trace.LoadConfig{Path: "/some/path/datadog.yml"}, nil, 123)
ta := trace.StartServerlessTraceAgent(trace.StartServerlessTraceAgentArgs{
Enabled: true,
LoadConfig: &trace.LoadConfig{Path: "/some/path/datadog.yml"},
ColdStartSpanID: 123,
})
d.SetTraceAgent(ta)

ma := &metrics.ServerlessMetricAgent{
Expand Down
5 changes: 4 additions & 1 deletion pkg/serverless/otlp/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ func TestServerlessOTLPAgentReceivesTraces(t *testing.T) {
t.Setenv("DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_ENDPOINT", grpcEndpoint)

// setup trace agent
traceAgent := trace.StartServerlessTraceAgent(true, &trace.LoadConfig{Path: "./testdata/valid.yml"}, nil, 0)
traceAgent := trace.StartServerlessTraceAgent(trace.StartServerlessTraceAgentArgs{
Enabled: true,
LoadConfig: &trace.LoadConfig{Path: "./testdata/valid.yml"},
})
defer traceAgent.Stop()
assert.NotNil(traceAgent)
traceChan := make(chan struct{})
Expand Down
20 changes: 15 additions & 5 deletions pkg/serverless/trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,37 @@ func (l *LoadConfig) Load() (*config.AgentConfig, error) {
return comptracecfg.LoadConfigFile(l.Path, c, l.Tagger)
}

// StartServerlessTraceAgentArgs are the arguments for the StartServerlessTraceAgent method
type StartServerlessTraceAgentArgs struct {
Enabled bool
LoadConfig Load
LambdaSpanChan chan<- *pb.Span
ColdStartSpanID uint64
AzureContainerAppTags string
}

// Start starts the agent
//
//nolint:revive // TODO(SERV) Fix revive linter
func StartServerlessTraceAgent(enabled bool, loadConfig Load, lambdaSpanChan chan<- *pb.Span, coldStartSpanId uint64) ServerlessTraceAgent {
if enabled {
func StartServerlessTraceAgent(args StartServerlessTraceAgentArgs) ServerlessTraceAgent {
if args.Enabled {
// Set the serverless config option which will be used to determine if
// hostname should be resolved. Skipping hostname resolution saves >1s
// in load time between gRPC calls and agent commands.
pkgconfigsetup.Datadog().Set("serverless.enabled", true, model.SourceAgentRuntime)

tc, confErr := loadConfig.Load()
tc, confErr := args.LoadConfig.Load()
if confErr != nil {
log.Errorf("Unable to load trace agent config: %s", confErr)
} else {
context, cancel := context.WithCancel(context.Background())
tc.Hostname = ""
tc.SynchronousFlushing = true
tc.AzureContainerAppTags = args.AzureContainerAppTags
ta := agent.NewAgent(context, tc, telemetry.NewNoopCollector(), &statsd.NoOpClient{}, zstd.NewComponent())
ta.SpanModifier = &spanModifier{
coldStartSpanId: coldStartSpanId,
lambdaSpanChan: lambdaSpanChan,
coldStartSpanId: args.ColdStartSpanID,
lambdaSpanChan: args.LambdaSpanChan,
ddOrigin: getDDOrigin(),
}

Expand Down
33 changes: 28 additions & 5 deletions pkg/serverless/trace/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ func TestStartEnabledFalse(t *testing.T) {
setupTraceAgentTest(t)

lambdaSpanChan := make(chan *pb.Span)
agent := StartServerlessTraceAgent(false, nil, lambdaSpanChan, random.Random.Uint64())
agent := StartServerlessTraceAgent(StartServerlessTraceAgentArgs{
LambdaSpanChan: lambdaSpanChan,
ColdStartSpanID: random.Random.Uint64(),
})
defer agent.Stop()
assert.NotNil(t, agent)
assert.IsType(t, noopTraceAgent{}, agent)
Expand All @@ -53,7 +56,12 @@ func TestStartEnabledTrueInvalidConfig(t *testing.T) {
setupTraceAgentTest(t)

lambdaSpanChan := make(chan *pb.Span)
agent := StartServerlessTraceAgent(true, &LoadConfigMocked{}, lambdaSpanChan, random.Random.Uint64())
agent := StartServerlessTraceAgent(StartServerlessTraceAgentArgs{
Enabled: true,
LoadConfig: &LoadConfigMocked{},
LambdaSpanChan: lambdaSpanChan,
ColdStartSpanID: random.Random.Uint64(),
})
defer agent.Stop()
assert.NotNil(t, agent)
assert.IsType(t, noopTraceAgent{}, agent)
Expand All @@ -65,7 +73,12 @@ func TestStartEnabledTrueValidConfigInvalidPath(t *testing.T) {
lambdaSpanChan := make(chan *pb.Span)

t.Setenv("DD_API_KEY", "x")
agent := StartServerlessTraceAgent(true, &LoadConfig{Path: "invalid.yml"}, lambdaSpanChan, random.Random.Uint64())
agent := StartServerlessTraceAgent(StartServerlessTraceAgentArgs{
Enabled: true,
LoadConfig: &LoadConfig{Path: "invalid.yml"},
LambdaSpanChan: lambdaSpanChan,
ColdStartSpanID: random.Random.Uint64(),
})
defer agent.Stop()
assert.NotNil(t, agent)
assert.IsType(t, &serverlessTraceAgent{}, agent)
Expand All @@ -76,7 +89,12 @@ func TestStartEnabledTrueValidConfigValidPath(t *testing.T) {

lambdaSpanChan := make(chan *pb.Span)

agent := StartServerlessTraceAgent(true, &LoadConfig{Path: "./testdata/valid.yml"}, lambdaSpanChan, random.Random.Uint64())
agent := StartServerlessTraceAgent(StartServerlessTraceAgentArgs{
Enabled: true,
LoadConfig: &LoadConfig{Path: "./testdata/valid.yml"},
LambdaSpanChan: lambdaSpanChan,
ColdStartSpanID: random.Random.Uint64(),
})
defer agent.Stop()
assert.NotNil(t, agent)
assert.IsType(t, &serverlessTraceAgent{}, agent)
Expand All @@ -89,7 +107,12 @@ func TestLoadConfigShouldBeFast(t *testing.T) {
startTime := time.Now()
lambdaSpanChan := make(chan *pb.Span)

agent := StartServerlessTraceAgent(true, &LoadConfig{Path: "./testdata/valid.yml"}, lambdaSpanChan, random.Random.Uint64())
agent := StartServerlessTraceAgent(StartServerlessTraceAgentArgs{
Enabled: true,
LoadConfig: &LoadConfig{Path: "./testdata/valid.yml"},
LambdaSpanChan: lambdaSpanChan,
ColdStartSpanID: random.Random.Uint64(),
})
defer agent.Stop()
assert.True(t, time.Since(startTime) < time.Second)
}
Expand Down
18 changes: 2 additions & 16 deletions pkg/trace/api/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,6 @@ const (
profilingV1EndpointSuffix = "v1/input"
)

var azureContainerAppTags = []string{
"subscription_id",
"resource_group",
"resource_id",
"replicate_name",
"aca.subscription.id",
"aca.resource.group",
"aca.resource.id",
"aca.replica.name",
}

// profilingEndpoints returns the profiling intake urls and their corresponding
// api keys based on agent configuration. The main endpoint is always returned as
// the first element in the slice.
Expand Down Expand Up @@ -97,11 +86,8 @@ func (r *HTTPReceiver) profileProxyHandler() http.Handler {
tags.WriteString(fmt.Sprintf("functionname:%s", strings.ToLower(r.conf.LambdaFunctionName)))
tags.WriteString("_dd.origin:lambda")
}

for _, azureContainerAppTag := range azureContainerAppTags {
if value, ok := r.conf.GlobalTags[azureContainerAppTag]; ok {
tags.WriteString(fmt.Sprintf(",%s:%s", azureContainerAppTag, value))
}
if r.conf.AzureContainerAppTags != "" {
tags.WriteString(r.conf.AzureContainerAppTags)
}

return newProfileProxy(r.conf, targets, keys, tags.String(), r.statsd)
Expand Down
10 changes: 1 addition & 9 deletions pkg/trace/api/profiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,15 +323,7 @@ func TestProfileProxyHandler(t *testing.T) {
}))
conf := newTestReceiverConfig()
conf.ProfilingProxy = config.ProfilingProxyConfig{DDURL: srv.URL}
conf.GlobalTags = map[string]string{
"subscription_id": "123",
"resource_group": "test-rg",
"resource_id": "456",
"aca.subscription.id": "123",
"aca.resource.group": "test-rg",
"aca.resource.id": "456",
"aca.replica.name": "test-replica",
}
conf.AzureContainerAppTags = ",subscription_id:123,resource_group:test-rg,resource_id:456,aca.subscription.id:123,aca.resource.group:test-rg,aca.resource.id:456,aca.replica.name:test-replica"
req, err := http.NewRequest("POST", "/some/path", nil)
if err != nil {
t.Fatal(err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/trace/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ type AgentConfig struct {

// Lambda function name
LambdaFunctionName string

// Azure container apps tags, in the form of a comma-separated list of
// key-value pairs, starting with a comma
AzureContainerAppTags string
}

// RemoteClient client is used to APM Sampling Updates from a remote source.
Expand Down
Loading