Skip to content

Commit

Permalink
Sidecar: Enable Lyft type sidecars
Browse files Browse the repository at this point in the history
Datadog: **NOT FROM UPSTREAM K8S**. From Lyft: lyft@266a18a

This commit is the combination of multiple commits we've done to cherry pick and bring lyft sidecars across the ages.

datadog:patch
  • Loading branch information
nyodas committed Nov 4, 2024
1 parent 4be311b commit 8692023
Show file tree
Hide file tree
Showing 8 changed files with 657 additions and 254 deletions.
18 changes: 16 additions & 2 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
"k8s.io/client-go/informers"
"k8s.io/mount-utils"

utilfs "k8s.io/kubernetes/pkg/util/filesystem"
mount "k8s.io/mount-utils"
netutils "k8s.io/utils/net"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -114,12 +113,14 @@ import (
"k8s.io/kubernetes/pkg/kubelet/userns"
"k8s.io/kubernetes/pkg/kubelet/userns/inuserns"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/manager"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
"k8s.io/kubernetes/pkg/security/apparmor"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
Expand Down Expand Up @@ -2698,6 +2699,8 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
// TODO: reconcile being calculated in the config manager is questionable, and avoiding
// extra syncs may no longer be necessary. Reevaluate whether Reconcile and Sync can be
// merged (after resolving the next two TODOs).
sidecarsStatus := status.GetSidecarsStatus(pod)
klog.Infof("Pod: %s, status: Present=%v,Ready=%v,ContainersWaiting=%v", format.Pod(pod), sidecarsStatus.SidecarsPresent, sidecarsStatus.SidecarsReady, sidecarsStatus.ContainersWaiting)

// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
// TODO: this should be unnecessary today - determine what is the cause for this to
Expand All @@ -2710,6 +2713,17 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
UpdateType: kubetypes.SyncPodSync,
StartTime: start,
})
} else if sidecarsStatus.ContainersWaiting {
// if containers aren't running and the sidecars are all ready trigger a sync so that the containers get started
if sidecarsStatus.SidecarsPresent && sidecarsStatus.SidecarsReady {
klog.Infof("Pod: %s: sidecars: sidecars are ready, dispatching work", format.Pod(pod))
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodSync,
StartTime: start,
})
}
}

// After an evicted pod is synced, all dead containers in the pod can be removed.
Expand Down
47 changes: 44 additions & 3 deletions pkg/kubelet/kuberuntime/kuberuntime_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,12 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(ctx context.

l := getContainerInfoFromLabels(s.Labels)
a := getContainerInfoFromAnnotations(s.Annotations)

annotations := make(map[string]string)
if a.Sidecar {
annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", l.ContainerName)] = "Sidecar"
}

// Notice that the followings are not full spec. The container killing code should not use
// un-restored fields.
pod = &v1.Pod{
Expand All @@ -737,6 +743,7 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(ctx context.
Name: l.PodName,
Namespace: l.PodNamespace,
DeletionGracePeriodSeconds: a.PodDeletionGracePeriod,
Annotations: annotations,
},
Spec: v1.PodSpec{
TerminationGracePeriodSeconds: a.PodTerminationGracePeriod,
Expand All @@ -762,8 +769,15 @@ func (m *kubeGenericRuntimeManager) killContainer(ctx context.Context, pod *v1.P
var containerSpec *v1.Container
if pod != nil {
if containerSpec = kubecontainer.GetContainerSpec(pod, containerName); containerSpec == nil {
return fmt.Errorf("failed to get containerSpec %q (id=%q) in pod %q when killing container for reason %q",
containerName, containerID.String(), format.Pod(pod), message)
// after a kubelet restart, it's not 100% certain that the
// pod we're given has the container we need in the spec
// -- we try to recover that here.
restoredPod, restoredContainer, err := m.restoreSpecsFromContainerLabels(ctx, containerID)
if err != nil {
return fmt.Errorf("failed to get containerSpec %q(id=%q) in pod %q when killing container for reason %q. error: %v",
containerName, containerID.String(), format.Pod(pod), message, err)
}
pod, containerSpec = restoredPod, restoredContainer
}
} else {
// Restore necessary information if one of the specs is nil.
Expand Down Expand Up @@ -826,9 +840,27 @@ func (m *kubeGenericRuntimeManager) killContainer(ctx context.Context, pod *v1.P

// killContainersWithSyncResult kills all pod's containers with sync results.
func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(ctx context.Context, pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) {
// split out sidecars and non-sidecars
var (
sidecars []*kubecontainer.Container
nonSidecars []*kubecontainer.Container
)

if gracePeriodOverride == nil {
minGracePeriod := int64(minimumGracePeriodInSeconds)
gracePeriodOverride = &minGracePeriod
}

for _, container := range runningPod.Containers {
if isSidecar(pod, container.Name) {
sidecars = append(sidecars, container)
} else {
nonSidecars = append(nonSidecars, container)
}
}
containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))
wg := sync.WaitGroup{}

wg := sync.WaitGroup{}
wg.Add(len(runningPod.Containers))
var termOrdering *terminationOrdering
// we only care about container termination ordering if the sidecars feature is enabled
Expand All @@ -839,6 +871,15 @@ func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(ctx context.Con
}
termOrdering = newTerminationOrdering(pod, runningContainerNames)
}

if len(sidecars) > 0 {
var runningContainerNames []string
for _, container := range runningPod.Containers {
runningContainerNames = append(runningContainerNames, container.Name)
}
termOrdering = lyftSidecarTerminationOrdering(pod, runningContainerNames)
}

for _, container := range runningPod.Containers {
go func(container *kubecontainer.Container) {
defer utilruntime.HandleCrash()
Expand Down
82 changes: 81 additions & 1 deletion pkg/kubelet/kuberuntime/kuberuntime_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/cache"
Expand Down Expand Up @@ -540,6 +541,14 @@ func containerSucceeded(c *v1.Container, podStatus *kubecontainer.PodStatus) boo
return cStatus.ExitCode == 0
}

func isSidecar(pod *v1.Pod, containerName string) bool {
if pod == nil {
klog.V(5).Infof("isSidecar: pod is nil, so returning false")
return false
}
return pod.Annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", containerName)] == "Sidecar"
}

func isInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
return false
Expand Down Expand Up @@ -835,6 +844,17 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *

handleRestartableInitContainers := utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) && types.HasRestartableInitContainer(pod)

var sidecarNames []string
for _, container := range pod.Spec.Containers {
if isSidecar(pod, container.Name) {
sidecarNames = append(sidecarNames, container.Name)
}
}

// determine sidecar status
sidecarStatus := status.GetSidecarsStatus(pod)
klog.Infof("Pod: %s, sidecars: %s, status: Present=%v,Ready=%v,ContainersWaiting=%v", format.Pod(pod), sidecarNames, sidecarStatus.SidecarsPresent, sidecarStatus.SidecarsReady, sidecarStatus.ContainersWaiting)

// If we need to (re-)create the pod sandbox, everything will need to be
// killed and recreated, and init containers should be purged.
if createPodSandbox {
Expand Down Expand Up @@ -890,7 +910,22 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *

return changes
}
changes.ContainersToStart = containersToStart
if len(sidecarNames) > 0 {
for idx, c := range pod.Spec.Containers {
if isSidecar(pod, c.Name) {
changes.ContainersToStart = append(changes.ContainersToStart, idx)
}
}
return changes
}
// Start all containers by default but exclude the ones that
// succeeded if RestartPolicy is OnFailure
for idx, c := range pod.Spec.Containers {
if containerSucceeded(&c, podStatus) && pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
continue
}
changes.ContainersToStart = append(changes.ContainersToStart, idx)
}
return changes
}

Expand Down Expand Up @@ -951,6 +986,21 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *
keepCount := 0
// check the status of containers.
for idx, container := range pod.Spec.Containers {
// this works because in other cases, if it was a sidecar, we
// are always allowed to handle the container.
//
// if it is a non-sidecar, and there are no sidecars, then
// we're are also always allowed to restart the container.
//
// if there are sidecars, then we can only restart non-sidecars under
// the following conditions:
// - the non-sidecars have run before (i.e. they are not in a Waiting state) OR
// - the sidecars are ready (we're starting them for the first time)
if !isSidecar(pod, container.Name) && sidecarStatus.SidecarsPresent && sidecarStatus.ContainersWaiting && !sidecarStatus.SidecarsReady {
klog.Infof("Pod: %s, Container: %s, sidecar=%v skipped: Present=%v,Ready=%v,ContainerWaiting=%v", format.Pod(pod), container.Name, isSidecar(pod, container.Name), sidecarStatus.SidecarsPresent, sidecarStatus.SidecarsReady, sidecarStatus.ContainersWaiting)
continue
}

containerStatus := podStatus.FindContainerStatusByName(container.Name)

// Call internal container post-stop lifecycle hook for any non-running container so that any
Expand Down Expand Up @@ -1028,6 +1078,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *
}

if keepCount == 0 && len(changes.ContainersToStart) == 0 {
klog.Infof("Pod: %s: KillPod=true", format.Pod(pod))
changes.KillPod = true
if handleRestartableInitContainers {
// To prevent the restartable init containers to keep pod alive, we should
Expand Down Expand Up @@ -1457,6 +1508,35 @@ func (m *kubeGenericRuntimeManager) doBackOff(pod *v1.Pod, container *v1.Contain
// only hard kill paths are allowed to specify a gracePeriodOverride in the kubelet in order to not corrupt user data.
// it is useful when doing SIGKILL for hard eviction scenarios, or max grace period during soft eviction scenarios.
func (m *kubeGenericRuntimeManager) KillPod(ctx context.Context, pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {
// if the pod is nil, we need to recover it, so we can get the
// grace period and also the sidecar status.
if pod == nil {
for _, container := range runningPod.Containers {
klog.Infof("Pod: %s, KillPod: pod nil, trying to restore from container %s", runningPod.Name, container.ID)
podSpec, _, err := m.restoreSpecsFromContainerLabels(ctx, container.ID)
if err != nil {
klog.Errorf("Pod: %s, KillPod: couldn't restore: %s", runningPod.Name, container.ID)
continue
}
pod = podSpec
break
}
}

if gracePeriodOverride == nil && pod != nil {
switch {
case pod.DeletionGracePeriodSeconds != nil:
gracePeriodOverride = pod.DeletionGracePeriodSeconds
case pod.Spec.TerminationGracePeriodSeconds != nil:
gracePeriodOverride = pod.Spec.TerminationGracePeriodSeconds
}
}

if gracePeriodOverride == nil || *gracePeriodOverride < minimumGracePeriodInSeconds {
min := int64(minimumGracePeriodInSeconds)
gracePeriodOverride = &min
}

err := m.killPodWithSyncResult(ctx, pod, runningPod, gracePeriodOverride)
return err.Error()
}
Expand Down
Loading

0 comments on commit 8692023

Please sign in to comment.