From 74b734b69d4e8220eee74e491f3e2cd12fd75f7a Mon Sep 17 00:00:00 2001 From: zeroalphat Date: Tue, 14 May 2024 11:38:51 +0900 Subject: [PATCH] Change the formula cat-gate uses to schedule pods Signed-off-by: zeroalphat --- config/rbac/role.yaml | 8 ++++ e2e/Makefile | 2 +- internal/controller/pod_controller.go | 41 ++++++++++++++++-- internal/controller/pod_controller_test.go | 49 +++++++++++++++++++--- 4 files changed, 91 insertions(+), 9 deletions(-) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 8b025d9..69c7b51 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -4,6 +4,14 @@ kind: ClusterRole metadata: name: manager-role rules: +- apiGroups: + - "" + resources: + - nodes + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/e2e/Makefile b/e2e/Makefile index c2f7cdb..7a5db9e 100644 --- a/e2e/Makefile +++ b/e2e/Makefile @@ -27,7 +27,7 @@ setup: mkdir -p $(BIN_DIR) $(CURL) -o $(BIN_DIR)/kubectl https://storage.googleapis.com/kubernetes-release/release/v$(E2ETEST_K8S_VERSION)/bin/linux/amd64/kubectl && chmod a+x $(BIN_DIR)/kubectl # TODO: specify kind version - GOBIN=$(BIN_DIR) go install sigs.k8s.io/kind@latest + GOBIN=$(BIN_DIR) go install sigs.k8s.io/kind@latest .PHONY: start start: diff --git a/internal/controller/pod_controller.go b/internal/controller/pod_controller.go index 92cd347..fec3e3b 100644 --- a/internal/controller/pod_controller.go +++ b/internal/controller/pod_controller.go @@ -18,6 +18,7 @@ package controller import ( "context" + "slices" "time" "github.com/cybozu-go/cat-gate/internal/constants" @@ -50,6 +51,7 @@ var requeueSeconds = 10 //+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch //+kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch //+kubebuilder:rbac:groups=core,resources=pods/finalizers,verbs=update +//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -78,6 +80,42 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R } reqImagesHash := annotations[constants.CatGateImagesHashAnnotation] + var reqImageList []string + for _, container := range reqPod.Spec.Containers { + if container.Image == "" { + continue + } + reqImageList = append(reqImageList, container.Image) + } + + nodes := &corev1.NodeList{} + err = r.List(ctx, nodes) + if err != nil { + logger.Error(err, "failed to list nodes") + return ctrl.Result{}, err + } + + nodeImageSet := make(map[string][]string) + for _, node := range nodes.Items { + for _, image := range node.Status.Images { + nodeImageSet[node.Name] = append(nodeImageSet[node.Name], image.Names...) + } + } + + nodeSet := make(map[string]struct{}) + for nodeName, images := range nodeImageSet { + allImageExists := true + for _, reqImage := range reqImageList { + if !slices.Contains(images, reqImage) { + allImageExists = false + break + } + } + if allImageExists { + nodeSet[nodeName] = struct{}{} + } + } + pods := &corev1.PodList{} err = r.List(ctx, pods, client.MatchingFields{constants.ImageHashAnnotationField: reqImagesHash}) if err != nil { @@ -85,8 +123,6 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return ctrl.Result{}, err } - nodeSet := make(map[string]struct{}) - numSchedulablePods := 0 numImagePulledPods := 0 @@ -106,7 +142,6 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R } if allStarted && len(pod.Spec.Containers) == len(statuses) { - nodeSet[pod.Status.HostIP] = struct{}{} numImagePulledPods += 1 } } diff --git a/internal/controller/pod_controller_test.go b/internal/controller/pod_controller_test.go index 986fa74..69f031f 100644 --- a/internal/controller/pod_controller_test.go +++ b/internal/controller/pod_controller_test.go @@ -15,10 +15,8 @@ import ( ) var _ = Describe("CatGate controller", func() { - ctx := context.Background() requeueSeconds = 1 - It("should schedule a pod if it is created solely", func() { testName := "single-pod" namespace := &corev1.Namespace{ @@ -279,7 +277,7 @@ var _ = Describe("CatGate controller", func() { }) - It("Should the schedule not increase if the pod is not Running", func() { + It("Should the schedule increase if the pod is not Running", func() { testName := "crash-pod" namespace := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ @@ -320,10 +318,52 @@ var _ = Describe("CatGate controller", func() { numSchedulable += 1 } } - // 1 pod is already scheduled, but it is not running, so 1 pod should be scheduled + // 1 pod is not Running, so 1 pod should be scheduled g.Expect(numSchedulable).To(Equal(1)) }).Should(Succeed()) }) + + It("Should more Pods be scheduled if images are present in a node, not limited to the number of Pods.", func() { + testName := "already-images-in-node" + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: testName, + }, + } + err := k8sClient.Create(ctx, namespace) + Expect(err).NotTo(HaveOccurred()) + + for i := 0; i < 10; i++ { + createNewNode(testName, i) + } + + nodes := &corev1.NodeList{} + err = k8sClient.List(ctx, nodes) + Expect(err).NotTo(HaveOccurred()) + for _, node := range nodes.Items { + updateNodeImageStatus(&node, []corev1.Container{{Image: fmt.Sprintf("%s.example.com/sample1-image:1.0.0", testName)}}) + updateNodeImageStatus(&node, []corev1.Container{{Image: fmt.Sprintf("%s.example.com/sample2-image:1.0.0", testName)}}) + } + + for i := 0; i < 10; i++ { + createNewPod(testName, i) + } + + pods := &corev1.PodList{} + Eventually(func(g Gomega) { + err := k8sClient.List(ctx, pods, &client.ListOptions{Namespace: testName}) + g.Expect(err).NotTo(HaveOccurred()) + numSchedulable := 0 + for _, pod := range pods.Items { + if !existsSchedulingGate(&pod) { + numSchedulable += 1 + } + } + // image is present in the node, so 10 pods should be scheduled + g.Expect(numSchedulable).To(Equal(10)) + }).Should(Succeed()) + }) + }) func createNewPod(testName string, index int) *corev1.Pod { @@ -423,7 +463,6 @@ func scheduleAndStartOneUnhealthyPod(namespace string) { for _, pod := range pods.Items { if !existsSchedulingGate(&pod) && len(pod.Status.ContainerStatuses) == 0 { updatePodStatus(&pod, corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{Reason: "RunContainerError"}}) - node := &corev1.Node{} err = k8sClient.Get(ctx, client.ObjectKey{Name: pod.Status.HostIP}, node) Expect(err).NotTo(HaveOccurred())