Skip to content

Commit

Permalink
Auto multiline v2 improve telemetry (#29613)
Browse files Browse the repository at this point in the history
  • Loading branch information
gh123man authored Oct 2, 2024
1 parent 7825aa7 commit 1a98d38
Show file tree
Hide file tree
Showing 13 changed files with 43 additions and 28 deletions.
3 changes: 2 additions & 1 deletion pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,8 @@ func logsagent(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("logs_config.auto_multi_line.tokenizer_max_input_bytes", 60)
config.BindEnvAndSetDefault("logs_config.auto_multi_line.pattern_table_max_size", 20)
config.BindEnvAndSetDefault("logs_config.auto_multi_line.pattern_table_match_threshold", 0.75)
config.BindEnvAndSetDefault("logs_config.tag_auto_multi_line_logs", false)
// Add a tag to logs that are multiline aggregated
config.BindEnvAndSetDefault("logs_config.tag_multi_line_logs", false)
// Add a tag to logs that are truncated by the agent
config.BindEnvAndSetDefault("logs_config.tag_truncated_logs", false)

Expand Down
18 changes: 7 additions & 11 deletions pkg/logs/internal/decoder/auto_multiline_detection/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"time"

"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/metrics"
status "github.com/DataDog/datadog-agent/pkg/logs/status/utils"
"github.com/DataDog/datadog-agent/pkg/telemetry"
)

type bucket struct {
Expand Down Expand Up @@ -61,27 +61,25 @@ func (b *bucket) flush() *message.Message {
copy(content, data)

msg := message.NewRawMessage(content, b.message.Status, b.originalDataLen, b.message.ParsingExtra.Timestamp)
tlmTags := []string{}
tlmTags := []string{"false", "single_line"}

if b.lineCount > 1 {
msg.ParsingExtra.IsMultiLine = true
tlmTags = append(tlmTags, "line_type:multi_line")
tlmTags[1] = "multi_line"
if b.tagMultiLineLogs {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.AutoMultiLineTag)
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.MultiLineSourceTag("auto_multiline"))
}
} else {
tlmTags = append(tlmTags, "line_type:single_line")
}

if b.truncated {
msg.ParsingExtra.IsTruncated = true
tlmTags = append(tlmTags, "truncated:true")
tlmTags[0] = "true"
if b.tagTruncatedLogs {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.TruncatedTag)
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.TruncatedReasonTag("auto_multiline"))
}
}

telemetry.GetStatsTelemetryProvider().Count("datadog.logs_agent.auto_multi_line_aggregator.flush", 1, tlmTags)
metrics.TlmAutoMultilineAggregatorFlush.Inc(tlmTags...)
return msg
}

Expand Down Expand Up @@ -135,7 +133,6 @@ func (a *Aggregator) Aggregate(msg *message.Message, label Label) {
// If `startGroup` - flush the bucket.
if label == startGroup {
a.multiLineMatchInfo.Add(1)
telemetry.GetStatsTelemetryProvider().Count("datadog.logs_agent.auto_multi_line_aggregator.multiline_matches", 1, []string{""})
a.Flush()
}

Expand All @@ -149,7 +146,6 @@ func (a *Aggregator) Aggregate(msg *message.Message, label Label) {

if !a.bucket.isEmpty() {
a.linesCombinedInfo.Add(1)
telemetry.GetStatsTelemetryProvider().Count("datadog.logs_agent.auto_multi_line_aggregator.lines_combined", 1, []string{""})
}

a.bucket.add(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ func TestTagTruncatedLogs(t *testing.T) {

msg := <-outputChan
assert.True(t, msg.ParsingExtra.IsTruncated)
assert.Equal(t, msg.ParsingExtra.Tags, []string{message.TruncatedTag})
assert.Equal(t, msg.ParsingExtra.Tags, []string{message.TruncatedReasonTag("auto_multiline")})
assertMessageContent(t, msg, "1234567890...TRUNCATED...")

msg = <-outputChan
assert.True(t, msg.ParsingExtra.IsTruncated)
assert.Equal(t, msg.ParsingExtra.Tags, []string{message.TruncatedTag})
assert.Equal(t, msg.ParsingExtra.Tags, []string{message.TruncatedReasonTag("auto_multiline")})
assertMessageContent(t, msg, "...TRUNCATED...1")

msg = <-outputChan
Expand All @@ -159,7 +159,7 @@ func TestTagMultiLineLogs(t *testing.T) {
msg := <-outputChan
assert.True(t, msg.ParsingExtra.IsMultiLine)
assert.True(t, msg.ParsingExtra.IsTruncated)
assert.Equal(t, msg.ParsingExtra.Tags, []string{message.AutoMultiLineTag})
assert.Equal(t, msg.ParsingExtra.Tags, []string{message.MultiLineSourceTag("auto_multiline")})
assertMessageContent(t, msg, "12345\\n67890...TRUNCATED...")

msg = <-outputChan
Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/internal/decoder/auto_multiline_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewAutoMultilineHandler(outputFn func(m *message.Message), maxContentSize i
maxContentSize,
flushTimeout,
pkgconfigsetup.Datadog().GetBool("logs_config.tag_truncated_logs"),
pkgconfigsetup.Datadog().GetBool("logs_config.tag_auto_multi_line_logs"),
pkgconfigsetup.Datadog().GetBool("logs_config.tag_multi_line_logs"),
tailerInfo),
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/logs/internal/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewDecoderWithFraming(source *sources.ReplaceableSource, parser parsers.Par
var lineHandler LineHandler
for _, rule := range source.Config().ProcessingRules {
if rule.Type == config.MultiLine {
lh := NewMultiLineHandler(outputFn, rule.Regex, config.AggregationTimeout(pkgconfigsetup.Datadog()), maxContentSize, false, tailerInfo)
lh := NewMultiLineHandler(outputFn, rule.Regex, config.AggregationTimeout(pkgconfigsetup.Datadog()), maxContentSize, false, tailerInfo, "multi_line")
syncSourceInfo(source, lh)
lineHandler = lh
}
Expand All @@ -111,7 +111,7 @@ func NewDecoderWithFraming(source *sources.ReplaceableSource, parser parsers.Par
// Save the pattern again for the next rotation
detectedPattern.Set(multiLinePattern)

lh := NewMultiLineHandler(outputFn, multiLinePattern, config.AggregationTimeout(pkgconfigsetup.Datadog()), maxContentSize, true, tailerInfo)
lh := NewMultiLineHandler(outputFn, multiLinePattern, config.AggregationTimeout(pkgconfigsetup.Datadog()), maxContentSize, true, tailerInfo, "legacy_auto_multi_line")
syncSourceInfo(source, lh)
lineHandler = lh
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/internal/decoder/legacy_auto_multiline_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (h *LegacyAutoMultilineHandler) switchToMultilineHandler(r *regexp.Regexp)
h.singleLineHandler = nil

// Build and start a multiline-handler
h.multiLineHandler = NewMultiLineHandler(h.outputFn, r, h.flushTimeout, h.lineLimit, true, h.tailerInfo)
h.multiLineHandler = NewMultiLineHandler(h.outputFn, r, h.flushTimeout, h.lineLimit, true, h.tailerInfo, "legacy_auto_multi_line")
h.source.RegisterInfo(h.multiLineHandler.countInfo)
h.source.RegisterInfo(h.multiLineHandler.linesCombinedInfo)
// stay with the multiline handler
Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/internal/decoder/line_handler_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func benchmarkMultiLineHandler(b *testing.B, logs int, line string) {
messages[i] = getDummyMessageWithLF(fmt.Sprintf("%s %d", line, i))
}

h := NewMultiLineHandler(func(*message.Message) {}, regexp.MustCompile(`^[A-Za-z_]+ \d+, \d+ \d+:\d+:\d+ (AM|PM)`), 1000*time.Millisecond, 100, false, status.NewInfoRegistry())
h := NewMultiLineHandler(func(*message.Message) {}, regexp.MustCompile(`^[A-Za-z_]+ \d+, \d+ \d+:\d+:\d+ (AM|PM)`), 1000*time.Millisecond, 100, false, status.NewInfoRegistry(), "")

b.ResetTimer()
for n := 0; n < b.N; n++ {
Expand Down
8 changes: 4 additions & 4 deletions pkg/logs/internal/decoder/line_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestTrimSingleLine(t *testing.T) {
func TestMultiLineHandler(t *testing.T) {
re := regexp.MustCompile(`[0-9]+\.`)
outputFn, outputChan := lineHandlerChans()
h := NewMultiLineHandler(outputFn, re, 250*time.Millisecond, 20, false, status.NewInfoRegistry())
h := NewMultiLineHandler(outputFn, re, 250*time.Millisecond, 20, false, status.NewInfoRegistry(), "")

var output *message.Message

Expand Down Expand Up @@ -187,7 +187,7 @@ func TestMultiLineHandler(t *testing.T) {
func TestTrimMultiLine(t *testing.T) {
re := regexp.MustCompile(`[0-9]+\.`)
outputFn, outputChan := lineHandlerChans()
h := NewMultiLineHandler(outputFn, re, 250*time.Millisecond, 100, false, status.NewInfoRegistry())
h := NewMultiLineHandler(outputFn, re, 250*time.Millisecond, 100, false, status.NewInfoRegistry(), "")

var output *message.Message

Expand Down Expand Up @@ -216,7 +216,7 @@ func TestTrimMultiLine(t *testing.T) {
func TestMultiLineHandlerDropsEmptyMessages(t *testing.T) {
re := regexp.MustCompile(`[0-9]+\.`)
outputFn, outputChan := lineHandlerChans()
h := NewMultiLineHandler(outputFn, re, 250*time.Millisecond, 100, false, status.NewInfoRegistry())
h := NewMultiLineHandler(outputFn, re, 250*time.Millisecond, 100, false, status.NewInfoRegistry(), "")

h.process(getDummyMessage(""))

Expand Down Expand Up @@ -245,7 +245,7 @@ func TestSingleLineHandlerSendsRawInvalidMessages(t *testing.T) {
func TestMultiLineHandlerSendsRawInvalidMessages(t *testing.T) {
re := regexp.MustCompile(`[0-9]+\.`)
outputFn, outputChan := lineHandlerChans()
h := NewMultiLineHandler(outputFn, re, 250*time.Millisecond, 100, false, status.NewInfoRegistry())
h := NewMultiLineHandler(outputFn, re, 250*time.Millisecond, 100, false, status.NewInfoRegistry(), "")

h.process(getDummyMessage("1.third line"))
h.process(getDummyMessage("fourth line"))
Expand Down
9 changes: 7 additions & 2 deletions pkg/logs/internal/decoder/multiline_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ type MultiLineHandler struct {
linesCombinedInfo *status.CountInfo
telemetryEnabled bool
linesCombined int
multiLineTagValue string
}

// NewMultiLineHandler returns a new MultiLineHandler.
func NewMultiLineHandler(outputFn func(*message.Message), newContentRe *regexp.Regexp, flushTimeout time.Duration, lineLimit int, telemetryEnabled bool, tailerInfo *status.InfoRegistry) *MultiLineHandler {
func NewMultiLineHandler(outputFn func(*message.Message), newContentRe *regexp.Regexp, flushTimeout time.Duration, lineLimit int, telemetryEnabled bool, tailerInfo *status.InfoRegistry, multiLineTagValue string) *MultiLineHandler {

i := status.NewMappedInfo("Multi-Line Pattern")
i.SetMessage("Pattern", newContentRe.String())
Expand All @@ -57,6 +58,7 @@ func NewMultiLineHandler(outputFn func(*message.Message), newContentRe *regexp.R
linesCombinedInfo: status.NewCountInfo("Lines Combined"),
telemetryEnabled: telemetryEnabled,
linesCombined: 0,
multiLineTagValue: multiLineTagValue,
}
return h
}
Expand Down Expand Up @@ -164,7 +166,10 @@ func (h *MultiLineHandler) sendBuffer() {
msg := message.NewRawMessage(content, h.status, h.linesLen, h.timestamp)
msg.ParsingExtra.IsTruncated = h.isBufferTruncated
if h.isBufferTruncated && pkgconfigsetup.Datadog().GetBool("logs_config.tag_truncated_logs") {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.TruncatedTag)
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.TruncatedReasonTag("multiline_regex"))
}
if h.isBufferTruncated && pkgconfigsetup.Datadog().GetBool("logs_config.tag_multi_line_logs") {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.MultiLineSourceTag(h.multiLineTagValue))
}
h.outputFn(msg)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/internal/decoder/single_line_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (h *SingleLineHandler) flush() {

func addTruncatedTag(msg *message.Message) {
if pkgconfigsetup.Datadog().GetBool("logs_config.tag_truncated_logs") {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.TruncatedTag)
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.TruncatedReasonTag("single_line"))
}
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/logs/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,13 @@ func (m *Message) Tags() []string {
func (m *Message) TagsToString() string {
return m.Origin.TagsToString(m.ProcessingTags)
}

// TruncatedReasonTag returns a tag with the reason for truncation.
func TruncatedReasonTag(reason string) string {
return fmt.Sprintf("truncated:%s", reason)
}

// MultiLineSourceTag returns a tag for multiline logs.
func MultiLineSourceTag(source string) string {
return fmt.Sprintf("multiline:%s", source)
}
3 changes: 3 additions & 0 deletions pkg/logs/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ var (
//nolint:revive // TODO(AML) Fix revive linter
TlmDestinationHttpRespByStatusAndUrl = telemetry.NewCounter("logs", "destination_http_resp", []string{"status_code", "url"}, "Count of http responses by status code and destination url")

// TlmAutoMultilineAggregatorFlush Count of each line flushed from the auto mulitline aggregator.
TlmAutoMultilineAggregatorFlush = telemetry.NewCounter("logs", "auto_multi_line_aggregator_flush", []string{"truncated", "line_type"}, "Count of each line flushed from the auto mulitline aggregator")

// TlmLogsDiscardedFromSDSBuffer how many messages were dropped when waiting for an SDS configuration because the buffer is full
TlmLogsDiscardedFromSDSBuffer = telemetry.NewCounter("logs", "sds__dropped_from_buffer", nil, "Count of messages dropped from the buffer while waiting for an SDS configuration")
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/tailers/file/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (suite *TailerTestSuite) TestTruncatedTag() {

msg := <-suite.outputChan
tags := msg.Tags()
suite.Contains(tags, message.TruncatedTag)
suite.Contains(tags, message.TruncatedReasonTag("single_line"))
}

func (suite *TailerTestSuite) TestMutliLineAutoDetect() {
Expand Down

0 comments on commit 1a98d38

Please sign in to comment.