Skip to content

Commit

Permalink
update configs
Browse files Browse the repository at this point in the history
  • Loading branch information
gh123man committed Nov 4, 2024
1 parent 120df76 commit d5c7dec
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 10 deletions.
3 changes: 1 addition & 2 deletions comp/logs/agent/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ package config

// Pipeline constraints
const (
DestinationPayloadChanSize = 10
NumberOfPipelines = 4
NumberOfPipelines = 4
)

const (
Expand Down
4 changes: 3 additions & 1 deletion pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,9 @@ func logsagent(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("logs_config.dev_mode_use_proto", true)
config.BindEnvAndSetDefault("logs_config.dd_url_443", "agent-443-intake.logs.datadoghq.com")
config.BindEnvAndSetDefault("logs_config.stop_grace_period", 30)
config.BindEnvAndSetDefault("logs_config.message_channel_size", 100)
config.BindEnvAndSetDefault("logs_config.payload_channel_size", 10)

// maximum time that the unix tailer will hold a log file open after it has been rotated
config.BindEnvAndSetDefault("logs_config.close_timeout", 60)
// maximum time that the windows tailer will hold a log file open, while waiting for
Expand All @@ -1538,7 +1541,6 @@ func logsagent(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("logs_config.tag_multi_line_logs", false)
// Add a tag to logs that are truncated by the agent
config.BindEnvAndSetDefault("logs_config.tag_truncated_logs", false)
config.BindEnvAndSetDefault("logs_config.chan_size", 100)

// If true, the agent looks for container logs in the location used by podman, rather
// than docker. This is a temporary configuration parameter to support podman logs until
Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (a *RegistryAuditor) Stop() {
func (a *RegistryAuditor) createChannels() {
a.chansMutex.Lock()
defer a.chansMutex.Unlock()
a.inputChan = make(chan *message.Payload, pkgconfigsetup.Datadog().GetInt("logs_config.chan_size"))
a.inputChan = make(chan *message.Payload, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size"))
a.done = make(chan struct{})
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/logs/diagnostic/message_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func NewBufferedMessageReceiver(f Formatter, hostname hostnameinterface.Componen
}
}
return &BufferedMessageReceiver{
inputChan: make(chan messagePair, pkgconfigsetup.Datadog().GetInt("logs_config.chan_size")),
inputChan: make(chan messagePair, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")),
formatter: f,
}
}

// Start opens new input channel
func (b *BufferedMessageReceiver) Start() {
b.inputChan = make(chan messagePair, pkgconfigsetup.Datadog().GetInt("logs_config.chan_size"))
b.inputChan = make(chan messagePair, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size"))
}

// Stop closes the input channel
Expand Down Expand Up @@ -109,7 +109,7 @@ func (b *BufferedMessageReceiver) HandleMessage(m *message.Message, rendered []b

// Filter writes the buffered events from the input channel formatted as a string to the output channel
func (b *BufferedMessageReceiver) Filter(filters *Filters, done <-chan struct{}) <-chan string {
out := make(chan string, pkgconfigsetup.Datadog().GetInt("logs_config.chan_size"))
out := make(chan string, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size"))
go func() {
defer close(out)
for {
Expand Down
6 changes: 3 additions & 3 deletions pkg/logs/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewPipeline(outputChan chan *message.Payload,

mainDestinations := getDestinations(endpoints, destinationsContext, pipelineMonitor, serverless, senderDoneChan, status, cfg)

strategyInput := make(chan *message.Message, pkgconfigsetup.Datadog().GetInt("logs_config.chan_size"))
strategyInput := make(chan *message.Message, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size"))
senderInput := make(chan *message.Payload, 1) // Only buffer 1 message since payloads can be large
flushChan := make(chan struct{})

Expand All @@ -79,9 +79,9 @@ func NewPipeline(outputChan chan *message.Payload,
}

strategy := getStrategy(strategyInput, senderInput, flushChan, endpoints, serverless, flushWg, pipelineMonitor)
logsSender = sender.NewSender(cfg, senderInput, outputChan, mainDestinations, config.DestinationPayloadChanSize, senderDoneChan, flushWg, pipelineMonitor)
logsSender = sender.NewSender(cfg, senderInput, outputChan, mainDestinations, pkgconfigsetup.Datadog().GetInt("logs_config.payload_channel_size"), senderDoneChan, flushWg, pipelineMonitor)

inputChan := make(chan *message.Message, pkgconfigsetup.Datadog().GetInt("logs_config.chan_size"))
inputChan := make(chan *message.Message, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size"))

processor := processor.New(cfg, inputChan, strategyInput, processingRules,
encoder, diagnosticMessageReceiver, hostname, pipelineMonitor)
Expand Down

0 comments on commit d5c7dec

Please sign in to comment.