From 77529fb2a826e73a73b70d86bec33c6b27e08e79 Mon Sep 17 00:00:00 2001 From: Taichi Takemura Date: Fri, 24 May 2024 14:01:06 +0900 Subject: [PATCH] Change the formula cat-gate uses to schedule pods (#20) * Change the formula cat-gate uses to schedule pods Signed-off-by: zeroalphat * Changed the writing style where units comes after when calculating time. Signed-off-by: zeroalphat * Change the conditional expression of numImagePullingPods Signed-off-by: zeroalphat --------- Signed-off-by: zeroalphat Co-authored-by: Masayuki Ishii --- cmd/main.go | 6 ++ config/rbac/role.yaml | 8 ++ e2e/Makefile | 2 +- internal/constants/constants.go | 3 + internal/controller/pod_controller.go | 88 +++++++++++++++----- internal/controller/pod_controller_test.go | 93 ++++++++++++++++------ internal/runners/garbage_collector.go | 44 ++++++++++ 7 files changed, 200 insertions(+), 44 deletions(-) create mode 100644 internal/runners/garbage_collector.go diff --git a/cmd/main.go b/cmd/main.go index bee55d8..8837766 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -36,6 +36,7 @@ import ( "github.com/cybozu-go/cat-gate/hooks" "github.com/cybozu-go/cat-gate/internal/controller" "github.com/cybozu-go/cat-gate/internal/indexing" + "github.com/cybozu-go/cat-gate/internal/runners" //+kubebuilder:scaffold:imports ) @@ -114,6 +115,11 @@ func main() { } //+kubebuilder:scaffold:builder + if err = mgr.Add(runners.GarbageCollector{}); err != nil { + setupLog.Error(err, "unable to add garbage collector") + os.Exit(1) + } + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") os.Exit(1) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 8b025d9..69c7b51 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -4,6 +4,14 @@ kind: ClusterRole metadata: name: manager-role rules: +- apiGroups: + - "" + resources: + - nodes + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/e2e/Makefile b/e2e/Makefile index c2f7cdb..7a5db9e 100644 --- a/e2e/Makefile +++ b/e2e/Makefile @@ -27,7 +27,7 @@ setup: mkdir -p $(BIN_DIR) $(CURL) -o $(BIN_DIR)/kubectl https://storage.googleapis.com/kubernetes-release/release/v$(E2ETEST_K8S_VERSION)/bin/linux/amd64/kubectl && chmod a+x $(BIN_DIR)/kubectl # TODO: specify kind version - GOBIN=$(BIN_DIR) go install sigs.k8s.io/kind@latest + GOBIN=$(BIN_DIR) go install sigs.k8s.io/kind@latest .PHONY: start start: diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 9cd975e..493e7fe 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -6,3 +6,6 @@ const PodSchedulingGateName = MetaPrefix + "gate" const CatGateImagesHashAnnotation = MetaPrefix + "images-hash" const ImageHashAnnotationField = ".metadata.annotations.images-hash" + +const LevelWarning = 1 +const LevelDebug = -1 diff --git a/internal/controller/pod_controller.go b/internal/controller/pod_controller.go index 92cd347..46d0584 100644 --- a/internal/controller/pod_controller.go +++ b/internal/controller/pod_controller.go @@ -18,6 +18,8 @@ package controller import ( "context" + "slices" + "sync" "time" "github.com/cybozu-go/cat-gate/internal/constants" @@ -42,14 +44,14 @@ const scaleRate = 2 // minimumCapacity the number of scheduling gates to remove when no node have the image. const minimumCapacity = 1 -const levelWarning = 1 -const levelDebug = -1 - var requeueSeconds = 10 +var gateRemovalDelayMilliSecond = 10 +var GateRemovalHistories = sync.Map{} //+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch //+kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch //+kubebuilder:rbac:groups=core,resources=pods/finalizers,verbs=update +//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -62,13 +64,17 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return ctrl.Result{}, client.IgnoreNotFound(err) } + if !existsSchedulingGate(reqPod) { + return ctrl.Result{}, nil + } + if reqPod.DeletionTimestamp != nil { return ctrl.Result{}, nil } annotations := reqPod.Annotations if _, ok := annotations[constants.CatGateImagesHashAnnotation]; !ok { - logger.V(levelWarning).Info("pod annotation not found") + logger.V(constants.LevelWarning).Info("pod annotation not found") err := r.removeSchedulingGate(ctx, reqPod) if err != nil { logger.Error(err, "failed to remove scheduling gate") @@ -78,6 +84,57 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R } reqImagesHash := annotations[constants.CatGateImagesHashAnnotation] + // prevents removing the scheduling gate based on information before the cache is updated. + if value, ok := GateRemovalHistories.Load(reqImagesHash); ok { + lastGateRemovalTime := value.(time.Time) + if time.Since(lastGateRemovalTime) < time.Duration(gateRemovalDelayMilliSecond)*time.Millisecond { + logger.V(constants.LevelDebug).Info("perform retry processing to avoid race conditions", "lastGateRemovalTime", lastGateRemovalTime) + return ctrl.Result{RequeueAfter: time.Duration(gateRemovalDelayMilliSecond) * time.Millisecond}, nil + } + } + + var reqImageList []string + for _, initContainer := range reqPod.Spec.InitContainers { + if initContainer.Image == "" { + continue + } + reqImageList = append(reqImageList, initContainer.Image) + } + for _, container := range reqPod.Spec.Containers { + if container.Image == "" { + continue + } + reqImageList = append(reqImageList, container.Image) + } + + nodes := &corev1.NodeList{} + err = r.List(ctx, nodes) + if err != nil { + logger.Error(err, "failed to list nodes") + return ctrl.Result{}, err + } + + nodeImageSet := make(map[string][]string) + for _, node := range nodes.Items { + for _, image := range node.Status.Images { + nodeImageSet[node.Name] = append(nodeImageSet[node.Name], image.Names...) + } + } + + nodeSet := make(map[string]struct{}) + for nodeName, images := range nodeImageSet { + allImageExists := true + for _, reqImage := range reqImageList { + if !slices.Contains(images, reqImage) { + allImageExists = false + break + } + } + if allImageExists { + nodeSet[nodeName] = struct{}{} + } + } + pods := &corev1.PodList{} err = r.List(ctx, pods, client.MatchingFields{constants.ImageHashAnnotationField: reqImagesHash}) if err != nil { @@ -85,8 +142,6 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return ctrl.Result{}, err } - nodeSet := make(map[string]struct{}) - numSchedulablePods := 0 numImagePulledPods := 0 @@ -96,17 +151,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R } numSchedulablePods += 1 - allStarted := true - statuses := pod.Status.ContainerStatuses - for _, status := range statuses { - if status.State.Running == nil && status.State.Terminated == nil { - allStarted = false - break - } - } - - if allStarted && len(pod.Spec.Containers) == len(statuses) { - nodeSet[pod.Status.HostIP] = struct{}{} + if pod.Status.Phase != corev1.PodPending { numImagePulledPods += 1 } } @@ -116,10 +161,10 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R if capacity < minimumCapacity { capacity = minimumCapacity } - logger.V(levelDebug).Info("schedule capacity", "capacity", capacity, "len(nodeSet)", len(nodeSet)) + logger.V(constants.LevelDebug).Info("schedule capacity", "capacity", capacity, "len(nodeSet)", len(nodeSet)) numImagePullingPods := numSchedulablePods - numImagePulledPods - logger.V(levelDebug).Info("scheduling progress", "numSchedulablePods", numSchedulablePods, "numImagePulledPods", numImagePulledPods, "numImagePullingPods", numImagePullingPods) + logger.V(constants.LevelDebug).Info("scheduling progress", "numSchedulablePods", numSchedulablePods, "numImagePulledPods", numImagePulledPods, "numImagePullingPods", numImagePullingPods) if capacity > numImagePullingPods { err := r.removeSchedulingGate(ctx, reqPod) @@ -127,10 +172,13 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R logger.Error(err, "failed to remove scheduling gate") return ctrl.Result{}, err } + now := time.Now() + GateRemovalHistories.Store(reqImagesHash, now) + return ctrl.Result{}, nil } return ctrl.Result{ - RequeueAfter: time.Second * time.Duration(requeueSeconds), + RequeueAfter: time.Duration(requeueSeconds) * time.Second, }, nil } diff --git a/internal/controller/pod_controller_test.go b/internal/controller/pod_controller_test.go index 986fa74..13aa041 100644 --- a/internal/controller/pod_controller_test.go +++ b/internal/controller/pod_controller_test.go @@ -92,7 +92,7 @@ var _ = Describe("CatGate controller", func() { numSchedulable += 1 } } - // no pod is already running, so 1 pods should be scheduled + // no nodes with images exist, so 1 pods should be scheduled g.Expect(numSchedulable).To(Equal(1)) }).Should(Succeed()) scheduleAndStartPods(testName) @@ -106,7 +106,7 @@ var _ = Describe("CatGate controller", func() { numSchedulable += 1 } } - // 1 pod is already running, so 3(1 + 1*2) pods should be scheduled + // several images exist on 1 node, so 3(1 + 1*2) pods should be scheduled g.Expect(numSchedulable).To(Equal(3)) }).Should(Succeed()) scheduleAndStartPods(testName) @@ -120,7 +120,7 @@ var _ = Describe("CatGate controller", func() { numSchedulable += 1 } } - // 3 pods are already running, so 9 (3 + 3*2) pods should be scheduled + // several images exist on 3 node, so 9 (3 + 3*2) pods should be scheduled g.Expect(numSchedulable).To(Equal(9)) }).Should(Succeed()) }) @@ -181,7 +181,7 @@ var _ = Describe("CatGate controller", func() { numSchedulable += 1 } } - // no pod is already running, so 1 pods should be scheduled + // no nodes with images exist, so 1 pods should be scheduled g.Expect(numSchedulable).To(Equal(1)) }).Should(Succeed()) scheduleAndStartPods(testName) @@ -195,7 +195,7 @@ var _ = Describe("CatGate controller", func() { numSchedulable += 1 } } - // 1 pod is already running, so 3(1 + 1*2) pods should be scheduled + // several images exist on 1 node, so 3(1 + 1*2) pods should be scheduled g.Expect(numSchedulable).To(Equal(3)) }).Should(Succeed()) scheduleAndStartOnePod(testName) @@ -209,7 +209,7 @@ var _ = Describe("CatGate controller", func() { numSchedulable += 1 } } - // 2 pods are already running, so 6 (2 + 2*2) pods should be scheduled + // several images exist on 2 node, so 6 (2 + 2*2) pods should be scheduled g.Expect(numSchedulable).To(Equal(6)) }).Should(Succeed()) }) @@ -245,7 +245,7 @@ var _ = Describe("CatGate controller", func() { numSchedulable += 1 } } - // no pod is already running, so 1 pods should be scheduled + // no nodes with images exist, so 1 pods should be scheduled g.Expect(numSchedulable).To(Equal(1)) }).Should(Succeed()) scheduleSpecificNodeAndStartOnePod(testName, nodes.Items[0].Name) @@ -259,7 +259,7 @@ var _ = Describe("CatGate controller", func() { numSchedulable += 1 } } - // 1 pod is already running, so 3(1 + 1*2) pods should be scheduled + // several images exist on 1 node, so 3(1 + 1*2) pods should be scheduled g.Expect(numSchedulable).To(Equal(3)) }).Should(Succeed()) scheduleSpecificNodeAndStartOnePod(testName, nodes.Items[0].Name) @@ -273,14 +273,14 @@ var _ = Describe("CatGate controller", func() { numSchedulable += 1 } } - // 2 pods are already running on a same node, so 3 pods should be scheduled + // several images exist on 1 node, so 3(1 + 1*2) pods should be scheduled. + // as there is only 1 node, it is not possible to schedule more pods than capacity. g.Expect(numSchedulable).To(Equal(3)) }).Should(Succeed()) - }) It("Should the schedule not increase if the pod is not Running", func() { - testName := "crash-pod" + testName := "no-start-pod" namespace := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: testName, @@ -291,8 +291,6 @@ var _ = Describe("CatGate controller", func() { for i := 0; i < 3; i++ { createNewPod(testName, i) - } - for i := 0; i < 3; i++ { createNewNode(testName, i) } @@ -306,7 +304,7 @@ var _ = Describe("CatGate controller", func() { numSchedulable += 1 } } - // no pod is already running, so 1 pods should be scheduled + // no nodes with images exist, so 1 pods should be scheduled g.Expect(numSchedulable).To(Equal(1)) }).Should(Succeed()) scheduleAndStartOneUnhealthyPod(testName) @@ -320,8 +318,52 @@ var _ = Describe("CatGate controller", func() { numSchedulable += 1 } } - // 1 pod is already scheduled, but it is not running, so 1 pod should be scheduled - g.Expect(numSchedulable).To(Equal(1)) + // 1 pod is already scheduled, but it is not running, + // so 2(capacity: 2, numImagePullingPods: 2) pod should be scheduled + g.Expect(numSchedulable).To(Equal(2)) + }).Should(Succeed()) + }) + + It("Should more Pods be scheduled if images are present in all nodes, not limited to the number of Pods.", func() { + testName := "already-images-in-node" + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: testName, + }, + } + err := k8sClient.Create(ctx, namespace) + Expect(err).NotTo(HaveOccurred()) + + for i := 0; i < 10; i++ { + createNewNode(testName, i) + } + + nodes := &corev1.NodeList{} + err = k8sClient.List(ctx, nodes) + Expect(err).NotTo(HaveOccurred()) + for _, node := range nodes.Items { + updateNodeImageStatus(&node, []corev1.Container{ + {Image: fmt.Sprintf("%s.example.com/sample1-image:1.0.0", testName)}, // init container image + {Image: fmt.Sprintf("%s.example.com/sample2-image:1.0.0", testName)}, // container image + }) + } + + for i := 0; i < 20; i++ { + createNewPod(testName, i) + } + + pods := &corev1.PodList{} + Eventually(func(g Gomega) { + err := k8sClient.List(ctx, pods, &client.ListOptions{Namespace: testName}) + g.Expect(err).NotTo(HaveOccurred()) + numSchedulable := 0 + for _, pod := range pods.Items { + if !existsSchedulingGate(&pod) { + numSchedulable += 1 + } + } + // image is present in the node, so 20 pods should be scheduled + g.Expect(numSchedulable).To(Equal(20)) }).Should(Succeed()) }) }) @@ -370,11 +412,12 @@ func scheduleAndStartPods(namespace string) { for _, pod := range pods.Items { if !existsSchedulingGate(&pod) { - updatePodStatus(&pod, corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}) + updatePodStatus(&pod, corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}, corev1.PodRunning) node := &corev1.Node{} err = k8sClient.Get(ctx, client.ObjectKey{Name: pod.Status.HostIP}, node) Expect(err).NotTo(HaveOccurred()) + updateNodeImageStatus(node, pod.Spec.InitContainers) updateNodeImageStatus(node, pod.Spec.Containers) } } @@ -387,11 +430,12 @@ func scheduleAndStartOnePod(namespace string) { for _, pod := range pods.Items { if !existsSchedulingGate(&pod) && len(pod.Status.ContainerStatuses) == 0 { - updatePodStatus(&pod, corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}) + updatePodStatus(&pod, corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}, corev1.PodRunning) node := &corev1.Node{} err = k8sClient.Get(ctx, client.ObjectKey{Name: pod.Status.HostIP}, node) Expect(err).NotTo(HaveOccurred()) + updateNodeImageStatus(node, pod.Spec.InitContainers) updateNodeImageStatus(node, pod.Spec.Containers) break } @@ -405,10 +449,11 @@ func scheduleSpecificNodeAndStartOnePod(namespace, nodeName string) { for _, pod := range pods.Items { if !existsSchedulingGate(&pod) && len(pod.Status.ContainerStatuses) == 0 { - updatePodStatusWithHostIP(&pod, corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}, nodeName) + updatePodStatusWithHostIP(&pod, corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}, corev1.PodRunning, nodeName) node := &corev1.Node{} err = k8sClient.Get(ctx, client.ObjectKey{Name: pod.Status.HostIP}, node) Expect(err).NotTo(HaveOccurred()) + updateNodeImageStatus(node, pod.Spec.InitContainers) updateNodeImageStatus(node, pod.Spec.Containers) break } @@ -422,18 +467,18 @@ func scheduleAndStartOneUnhealthyPod(namespace string) { for _, pod := range pods.Items { if !existsSchedulingGate(&pod) && len(pod.Status.ContainerStatuses) == 0 { - updatePodStatus(&pod, corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{Reason: "RunContainerError"}}) - + updatePodStatus(&pod, corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{Reason: "RunContainerError"}}, corev1.PodPending) node := &corev1.Node{} err = k8sClient.Get(ctx, client.ObjectKey{Name: pod.Status.HostIP}, node) Expect(err).NotTo(HaveOccurred()) + updateNodeImageStatus(node, pod.Spec.InitContainers) updateNodeImageStatus(node, pod.Spec.Containers) break } } } -func updatePodStatus(pod *corev1.Pod, state corev1.ContainerState) { +func updatePodStatus(pod *corev1.Pod, state corev1.ContainerState, phase corev1.PodPhase) { pod.Status.ContainerStatuses = []corev1.ContainerStatus{ { State: state, @@ -445,16 +490,18 @@ func updatePodStatus(pod *corev1.Pod, state corev1.ContainerState) { idx := regex.FindStringIndex(podName) nodeName := podName[:idx[0]] + strings.Replace(podName[idx[0]:], "pod", "node", 1) pod.Status.HostIP = nodeName + pod.Status.Phase = phase err = k8sClient.Status().Update(ctx, pod) Expect(err).NotTo(HaveOccurred()) } -func updatePodStatusWithHostIP(pod *corev1.Pod, state corev1.ContainerState, nodeName string) { +func updatePodStatusWithHostIP(pod *corev1.Pod, state corev1.ContainerState, phase corev1.PodPhase, nodeName string) { pod.Status.ContainerStatuses = []corev1.ContainerStatus{ { State: state, }, } + pod.Status.Phase = phase pod.Status.HostIP = nodeName err := k8sClient.Status().Update(ctx, pod) Expect(err).NotTo(HaveOccurred()) diff --git a/internal/runners/garbage_collector.go b/internal/runners/garbage_collector.go new file mode 100644 index 0000000..c3aecd2 --- /dev/null +++ b/internal/runners/garbage_collector.go @@ -0,0 +1,44 @@ +package runners + +import ( + "context" + "time" + + "github.com/cybozu-go/cat-gate/internal/constants" + "github.com/cybozu-go/cat-gate/internal/controller" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +const gcIntervalHours = 24 + +var historyDeletionDuration = 24 * 60 * 60 // 1 day + +type GarbageCollector struct { +} + +func (gc GarbageCollector) NeedLeaderElection() bool { + return true +} + +func (gc GarbageCollector) Start(ctx context.Context) error { + ticker := time.NewTicker(time.Hour * gcIntervalHours) + defer ticker.Stop() + logger := log.FromContext(ctx) + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + controller.GateRemovalHistories.Range(func(imageHash, value interface{}) bool { + lastGateRemovalTime := value.(time.Time) + // Delete history that has not been updated for a long time to prevent memory leaks. + if time.Since(lastGateRemovalTime) > time.Duration(historyDeletionDuration)*time.Second { + logger.V(constants.LevelDebug).Info("delete old history", "image hash", imageHash, "lastGateRemovalTime", lastGateRemovalTime) + controller.GateRemovalHistories.Delete(imageHash) + } + return true + }) + } + } +}