Skip to content

Commit

Permalink
Add bare minimum amount of code to allow streaming logs via kubelet api
Browse files Browse the repository at this point in the history
  • Loading branch information
sblumenthal committed Dec 4, 2024
1 parent f15536c commit 67f5c26
Show file tree
Hide file tree
Showing 9 changed files with 747 additions and 1 deletion.
2 changes: 2 additions & 0 deletions pkg/logs/launchers/container/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func (l *Launcher) loop(ctx context.Context, addedSources, removedSources chan *
func (l *Launcher) startSource(source *sourcesPkg.LogSource) {
containerID := source.Config.Identifier

log.Infof("BANANA: a source was detected %s", source.Config.Identifier)

// if this is not of a supported container type, ignore it
if _, ok := containerSourceTypes[source.Config.Type]; !ok {
return
Expand Down
102 changes: 102 additions & 0 deletions pkg/logs/launchers/container/tailerfactory/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build docker

package tailerfactory

// This file handles creating docker tailers which access the container runtime
// via socket.

import (
"errors"
"fmt"
"time"

workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/logs/internal/util/containersorpods"
"github.com/DataDog/datadog-agent/pkg/logs/launchers/container/tailerfactory/tailers"
"github.com/DataDog/datadog-agent/pkg/logs/sources"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/kubelet"
)

// makeSocketTailer makes a socket-based tailer for the given source, or returns
// an error if it cannot do so (e.g., due to permission errors)
func (tf *factory) makeApiTailer(source *sources.LogSource) (Tailer, error) {
containerID := source.Config.Identifier

wmeta, ok := tf.workloadmetaStore.Get()
if !ok {
return nil, errors.New("workloadmeta store is not initialized")
}
pod, err := wmeta.GetKubernetesPodForContainer(containerID)
if err != nil {
return nil, fmt.Errorf("cannot find pod for container %q: %w", containerID, err)
}

var container *workloadmeta.OrchestratorContainer
for _, pc := range pod.GetAllContainers() {
if pc.ID == containerID {
container = &pc
break
}
}

if container == nil {
// this failure is impossible, as GetKubernetesPodForContainer found
// the pod by searching for this container
return nil, fmt.Errorf("cannot find container %q in pod %q", containerID, pod.Name)
}

ku, err := kubelet.GetKubeUtil()
if err != nil {
return nil, fmt.Errorf("Could not use kubelet client to collect logs for container %s: %w",
containerID, err)
}

// Note that it's not clear from k8s documentation that the container logs,
// or even the directory containing these logs, must exist at this point.
// To avoid incorrectly falling back to socket logging (or failing to log
// entirely) we do not check for the file here. This matches older
// kubernetes-launcher behavior.

//sourceName, serviceName := tf.defaultSourceAndService(source, containersorpods.LogPods)
//// New file source that inherits most of its parent's properties
//fileSource := sources.NewLogSource(
// fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, container.Name),
// &config.LogsConfig{
// Type: config.FileType,
// TailingMode: source.Config.TailingMode,
// Identifier: containerID,
// Path: path,
// Service: serviceName,
// Source: sourceName,
// Tags: source.Config.Tags,
// ProcessingRules: source.Config.ProcessingRules,
// AutoMultiLine: source.Config.AutoMultiLine,
// AutoMultiLineSampleSize: source.Config.AutoMultiLineSampleSize,
// AutoMultiLineMatchThreshold: source.Config.AutoMultiLineMatchThreshold,
// })

pipeline := tf.pipelineProvider.NextPipelineChan()
readTimeout := time.Duration(pkgconfigsetup.Datadog().GetInt("logs_config.docker_client_read_timeout")) * time.Second

// apply defaults for source and service directly to the LogSource struct (!!)
source.Config.Source, source.Config.Service = tf.defaultSourceAndService(source, containersorpods.LogPods)

return tailers.NewApiTailer(
ku,
containerID,
container.Name,
pod.Name,
pod.Namespace,
source,
pipeline,
readTimeout,
tf.registry,
tf.tagger,
), nil
}
8 changes: 7 additions & 1 deletion pkg/logs/launchers/container/tailerfactory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package tailerfactory
import (
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
"github.com/DataDog/datadog-agent/pkg/config/env"
"github.com/DataDog/datadog-agent/pkg/logs/auditor"
"github.com/DataDog/datadog-agent/pkg/logs/internal/util/containersorpods"
"github.com/DataDog/datadog-agent/pkg/logs/pipeline"
Expand Down Expand Up @@ -71,7 +72,7 @@ func New(sources *sources.LogSources, pipelineProvider pipeline.Provider, regist

// MakeTailer implements Factory#MakeTailer.
func (tf *factory) MakeTailer(source *sources.LogSource) (Tailer, error) {
return tf.makeTailer(source, tf.useFile, tf.makeFileTailer, tf.makeSocketTailer)
return tf.makeTailer(source, tf.useFile, tf.makeFileTailer, tf.makeSocketTailer, tf.makeApiTailer)
}

// makeTailer makes a new tailer, using function pointers to allow testing.
Expand All @@ -80,13 +81,18 @@ func (tf *factory) makeTailer(
useFile func(*sources.LogSource) bool,
makeFileTailer func(*sources.LogSource) (Tailer, error),
makeSocketTailer func(*sources.LogSource) (Tailer, error),
makeApiTailer func(*sources.LogSource) (Tailer, error),
) (Tailer, error) {

// depending on the result of useFile, prefer either file logging or socket
// logging, but fall back to the opposite.

switch useFile(source) {
case true:
if env.IsFeaturePresent(env.EKSFargate) {
log.Infof("BANANA: tailer should be used for %s", source.Config.Identifier)
return tf.makeApiTailer(source)
}
t, err := makeFileTailer(source)
if err == nil {
return t, nil
Expand Down
189 changes: 189 additions & 0 deletions pkg/logs/launchers/container/tailerfactory/tailers/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build docker

//nolint:revive // TODO(AML) Fix revive linter
package tailers

import (
"context"
"time"

"github.com/DataDog/datadog-agent/pkg/logs/auditor"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/sources"
apiTailerPkg "github.com/DataDog/datadog-agent/pkg/logs/tailers/api"
dockerutilPkg "github.com/DataDog/datadog-agent/pkg/util/docker"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/kubelet"
"github.com/DataDog/datadog-agent/pkg/util/log"

tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
)

// ApiTailer wraps pkg/logs/tailers/docker.Tailer to satisfy
// the container launcher's `Tailer` interface, and to handle the
// erroredContainerID channel.
//
// NOTE: once the docker launcher is removed, the inner Docker tailer can be
// modified to suit the Tailer interface directly and to handle connection
// failures on its own, and this wrapper will no longer be necessary.
type ApiTailer struct {
// arguments to dockerTailerPkg.NewTailer (except erroredContainerID)

kubeUtil kubelet.KubeUtilInterface
ContainerID string
ContainerName string
PodName string
PodNamespace string
source *sources.LogSource
pipeline chan *message.Message
readTimeout time.Duration
tagger tagger.Component

// registry is used to calculate `since`
registry auditor.Registry

// ctx controls the run loop
ctx context.Context

// cancel stops the run loop
cancel context.CancelFunc

// stopped is closed when the run loop finishes
stopped chan struct{}
}

// NewApiTailer Creates a new docker socket tailer
func NewApiTailer(kubeutil kubelet.KubeUtilInterface, containerID, containerName, podName, podNamespace string, source *sources.LogSource, pipeline chan *message.Message, readTimeout time.Duration, registry auditor.Registry, tagger tagger.Component) *ApiTailer {
return &ApiTailer{
kubeUtil: kubeutil,
ContainerID: containerID,
ContainerName: containerName,
PodName: podName,
PodNamespace: podNamespace,
source: source,
pipeline: pipeline,
readTimeout: readTimeout,
registry: registry,
tagger: tagger,
ctx: nil,
cancel: nil,
stopped: nil,
}
}

// tryStartTailer tries to start the inner tailer, returning an erroredContainerID channel if
// successful.
func (t *ApiTailer) tryStartTailer() (*apiTailerPkg.Tailer, chan string, error) {
erroredContainerID := make(chan string)
inner := apiTailerPkg.NewTailer(
t.kubeUtil,
t.ContainerID,
t.ContainerName,
t.PodName,
t.PodNamespace,
t.source,
t.pipeline,
erroredContainerID,
t.readTimeout,
t.tagger,
)
since, err := since(t.registry, inner.Identifier())
if err != nil {
log.Warnf("Could not recover tailing from last committed offset %v: %v",
dockerutilPkg.ShortContainerID(t.ContainerID), err)
// (the `since` value is still valid)
}

err = inner.Start(since)
if err != nil {
return nil, nil, err
}
return inner, erroredContainerID, nil
}

// stopTailer stops the inner tailer.
func (t *ApiTailer) stopTailer(inner *apiTailerPkg.Tailer) {
inner.Stop()
}

// Start implements Tailer#Start.
func (t *ApiTailer) Start() error {
t.ctx, t.cancel = context.WithCancel(context.Background())
t.stopped = make(chan struct{})
go t.run(t.tryStartTailer, t.stopTailer)
return nil
}

// Stop implements Tailer#Stop.
func (t *ApiTailer) Stop() {
t.cancel()
t.cancel = nil
<-t.stopped
}

// run implements a loop to monitor the tailer and re-create it if it fails. It takes
// pointers to tryStartTailer and stopTailer to support testing.
func (t *ApiTailer) run(
tryStartTailer func() (*apiTailerPkg.Tailer, chan string, error),
stopTailer func(*apiTailerPkg.Tailer),
) {
defer close(t.stopped)

backoffDuration := backoffInitialDuration

for {
var backoffTimerC <-chan time.Time

// try to start the inner tailer
inner, erroredContainerID, err := tryStartTailer()
if err != nil {
if backoffDuration > backoffMaxDuration {
log.Warnf("Could not tail container %v: %v",
dockerutilPkg.ShortContainerID(t.ContainerID), err)
return
}
// set up to wait before trying again
backoffTimerC = time.After(backoffDuration)
backoffDuration *= 2
} else {
// success, so reset backoff
backoffTimerC = nil
backoffDuration = backoffInitialDuration
}

select {
case <-t.ctx.Done():
// the launcher has requested that the tailer stop
if inner != nil {
// Ensure any pending errors are cleared when we try to stop the tailer. Since erroredContainerID
// is unbuffered, any pending writes to this channel could cause a deadlock as the tailers stop
// condition is managed in the same goroutine in dockerTailerPkg.
go func() {
//nolint:revive // TODO(AML) Fix revive linter
for range erroredContainerID {
}
}()
stopTailer(inner)
if erroredContainerID != nil {
close(erroredContainerID)
}
}
return

case <-erroredContainerID:
// the inner tailer has failed after it has started
if inner != nil {
stopTailer(inner)
}
continue // retry

case <-backoffTimerC:
// it's time to retry starting the tailer
continue
}
}
}
49 changes: 49 additions & 0 deletions pkg/logs/tailers/api/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build docker

//nolint:revive // TODO(AML) Fix revive linter
package api

import (
"errors"
"io"
)

var errReaderNotInitialized = errors.New("reader not initialized")

// safeReader wraps an io.ReadCloser in such a way that a nil reader is
// treated as a recoverable error and does not cause a panic. This is a
// belt-and-suspenders way to avoid such nil-pointer panics; see
// https://github.com/DataDog/datadog-agent/pull/2817
type safeReader struct {
reader io.ReadCloser
}

func newSafeReader() *safeReader {
return &safeReader{}
}

func (s *safeReader) setUnsafeReader(reader io.ReadCloser) {
s.reader = reader
}

func (s *safeReader) Read(p []byte) (int, error) {
if s.reader == nil {
err := errReaderNotInitialized
return 0, err
}

return s.reader.Read(p)
}

func (s *safeReader) Close() error {
if s.reader == nil {
return errReaderNotInitialized
}

return s.reader.Close()
}
Loading

0 comments on commit 67f5c26

Please sign in to comment.