Skip to content
This repository has been archived by the owner on Oct 10, 2024. It is now read-only.

Commit

Permalink
Change the formula cat-gate uses to schedule pods
Browse files Browse the repository at this point in the history
Signed-off-by: zeroalphat <taichi-takemura@cybozu.co.jp>
  • Loading branch information
zeroalphat committed May 21, 2024
1 parent 67a5a1e commit f4d12f8
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 21 deletions.
6 changes: 6 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cybozu-go/cat-gate/hooks"
"github.com/cybozu-go/cat-gate/internal/controller"
"github.com/cybozu-go/cat-gate/internal/indexing"
"github.com/cybozu-go/cat-gate/internal/runners"
//+kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -114,6 +115,11 @@ func main() {
}
//+kubebuilder:scaffold:builder

if err = mgr.Add(runners.GarbageCollector{}); err != nil {
setupLog.Error(err, "unable to add garbage collector")
os.Exit(1)
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ kind: ClusterRole
metadata:
name: manager-role
rules:
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
2 changes: 1 addition & 1 deletion e2e/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ setup:
mkdir -p $(BIN_DIR)
$(CURL) -o $(BIN_DIR)/kubectl https://storage.googleapis.com/kubernetes-release/release/v$(E2ETEST_K8S_VERSION)/bin/linux/amd64/kubectl && chmod a+x $(BIN_DIR)/kubectl
# TODO: specify kind version
GOBIN=$(BIN_DIR) go install sigs.k8s.io/kind@latest
GOBIN=$(BIN_DIR) go install sigs.k8s.io/kind@latest

.PHONY: start
start:
Expand Down
60 changes: 57 additions & 3 deletions internal/controller/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package controller

import (
"context"
"slices"
"sync"
"time"

"github.com/cybozu-go/cat-gate/internal/constants"
Expand Down Expand Up @@ -46,10 +48,13 @@ const levelWarning = 1
const levelDebug = -1

var requeueSeconds = 10
var gateRemovalDelayMilliSecond = 10
var GateRemovalHistories = sync.Map{}

//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch
//+kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=core,resources=pods/finalizers,verbs=update
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand All @@ -62,6 +67,10 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !existsSchedulingGate(reqPod) {
return ctrl.Result{}, nil
}

if reqPod.DeletionTimestamp != nil {
return ctrl.Result{}, nil
}
Expand All @@ -78,15 +87,58 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}
reqImagesHash := annotations[constants.CatGateImagesHashAnnotation]

// prevents removing the scheduling gate based on information before the cache is updated.
if value, ok := GateRemovalHistories.Load(reqImagesHash); ok {
lastGateRemovalTime := time.UnixMilli(value.(int64))
if time.Since(lastGateRemovalTime) < time.Millisecond*time.Duration(gateRemovalDelayMilliSecond) {
logger.V(levelDebug).Info("perform retry processing to avoid race conditions", "lastGateRemovalTime", lastGateRemovalTime)
return ctrl.Result{RequeueAfter: time.Millisecond * time.Duration(gateRemovalDelayMilliSecond)}, nil
}
}

var reqImageList []string
for _, container := range reqPod.Spec.Containers {
if container.Image == "" {
continue
}
reqImageList = append(reqImageList, container.Image)
}

nodes := &corev1.NodeList{}
err = r.List(ctx, nodes)
if err != nil {
logger.Error(err, "failed to list nodes")
return ctrl.Result{}, err
}

nodeImageSet := make(map[string][]string)
for _, node := range nodes.Items {
for _, image := range node.Status.Images {
nodeImageSet[node.Name] = append(nodeImageSet[node.Name], image.Names...)
}
}

nodeSet := make(map[string]struct{})
for nodeName, images := range nodeImageSet {
allImageExists := true
for _, reqImage := range reqImageList {
if !slices.Contains(images, reqImage) {
allImageExists = false
break
}
}
if allImageExists {
nodeSet[nodeName] = struct{}{}
}
}

pods := &corev1.PodList{}
err = r.List(ctx, pods, client.MatchingFields{constants.ImageHashAnnotationField: reqImagesHash})
if err != nil {
logger.Error(err, "failed to list pods")
return ctrl.Result{}, err
}

nodeSet := make(map[string]struct{})

numSchedulablePods := 0
numImagePulledPods := 0

Expand All @@ -106,7 +158,6 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}

if allStarted && len(pod.Spec.Containers) == len(statuses) {
nodeSet[pod.Status.HostIP] = struct{}{}
numImagePulledPods += 1
}
}
Expand All @@ -127,6 +178,9 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
logger.Error(err, "failed to remove scheduling gate")
return ctrl.Result{}, err
}
now := time.Now().UnixMilli()
GateRemovalHistories.Store(reqImagesHash, now)
return ctrl.Result{}, nil
}

return ctrl.Result{
Expand Down
75 changes: 58 additions & 17 deletions internal/controller/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var _ = Describe("CatGate controller", func() {
numSchedulable += 1
}
}
// no pod is already running, so 1 pods should be scheduled
// no nodes with images exist, so 1 pods should be scheduled
g.Expect(numSchedulable).To(Equal(1))
}).Should(Succeed())
scheduleAndStartPods(testName)
Expand All @@ -106,7 +106,7 @@ var _ = Describe("CatGate controller", func() {
numSchedulable += 1
}
}
// 1 pod is already running, so 3(1 + 1*2) pods should be scheduled
// several images exist on 1 node, so 3(1 + 1*2) pods should be scheduled
g.Expect(numSchedulable).To(Equal(3))
}).Should(Succeed())
scheduleAndStartPods(testName)
Expand All @@ -120,7 +120,7 @@ var _ = Describe("CatGate controller", func() {
numSchedulable += 1
}
}
// 3 pods are already running, so 9 (3 + 3*2) pods should be scheduled
// several images exist on 3 node, so 9 (3 + 3*2) pods should be scheduled
g.Expect(numSchedulable).To(Equal(9))
}).Should(Succeed())
})
Expand Down Expand Up @@ -181,7 +181,7 @@ var _ = Describe("CatGate controller", func() {
numSchedulable += 1
}
}
// no pod is already running, so 1 pods should be scheduled
// no nodes with images exist, so 1 pods should be scheduled
g.Expect(numSchedulable).To(Equal(1))
}).Should(Succeed())
scheduleAndStartPods(testName)
Expand All @@ -195,7 +195,7 @@ var _ = Describe("CatGate controller", func() {
numSchedulable += 1
}
}
// 1 pod is already running, so 3(1 + 1*2) pods should be scheduled
// several images exist on 1 node, so 3(1 + 1*2) pods should be scheduled
g.Expect(numSchedulable).To(Equal(3))
}).Should(Succeed())
scheduleAndStartOnePod(testName)
Expand All @@ -209,7 +209,7 @@ var _ = Describe("CatGate controller", func() {
numSchedulable += 1
}
}
// 2 pods are already running, so 6 (2 + 2*2) pods should be scheduled
// several images exist on 2 node, so 6 (2 + 2*2) pods should be scheduled
g.Expect(numSchedulable).To(Equal(6))
}).Should(Succeed())
})
Expand Down Expand Up @@ -245,7 +245,7 @@ var _ = Describe("CatGate controller", func() {
numSchedulable += 1
}
}
// no pod is already running, so 1 pods should be scheduled
// no nodes with images exist, so 1 pods should be scheduled
g.Expect(numSchedulable).To(Equal(1))
}).Should(Succeed())
scheduleSpecificNodeAndStartOnePod(testName, nodes.Items[0].Name)
Expand All @@ -259,7 +259,7 @@ var _ = Describe("CatGate controller", func() {
numSchedulable += 1
}
}
// 1 pod is already running, so 3(1 + 1*2) pods should be scheduled
// several images exist on 1 node, so 3(1 + 1*2) pods should be scheduled
g.Expect(numSchedulable).To(Equal(3))
}).Should(Succeed())
scheduleSpecificNodeAndStartOnePod(testName, nodes.Items[0].Name)
Expand All @@ -273,14 +273,14 @@ var _ = Describe("CatGate controller", func() {
numSchedulable += 1
}
}
// 2 pods are already running on a same node, so 3 pods should be scheduled
// several images exist on 1 node, so 3(1 + 1*2) pods should be scheduled.
// as there is only 1 node, it is not possible to schedule more pods than capacity.
g.Expect(numSchedulable).To(Equal(3))
}).Should(Succeed())

})

It("Should the schedule not increase if the pod is not Running", func() {
testName := "crash-pod"
testName := "no-start-pod"
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: testName,
Expand All @@ -291,8 +291,6 @@ var _ = Describe("CatGate controller", func() {

for i := 0; i < 3; i++ {
createNewPod(testName, i)
}
for i := 0; i < 3; i++ {
createNewNode(testName, i)
}

Expand All @@ -306,7 +304,7 @@ var _ = Describe("CatGate controller", func() {
numSchedulable += 1
}
}
// no pod is already running, so 1 pods should be scheduled
// no nodes with images exist, so 1 pods should be scheduled
g.Expect(numSchedulable).To(Equal(1))
}).Should(Succeed())
scheduleAndStartOneUnhealthyPod(testName)
Expand All @@ -320,8 +318,52 @@ var _ = Describe("CatGate controller", func() {
numSchedulable += 1
}
}
// 1 pod is already scheduled, but it is not running, so 1 pod should be scheduled
g.Expect(numSchedulable).To(Equal(1))
// 1 pod is already scheduled, but it is not running,
// so 2(capacity: 2, numImagePullingPods: 2) pod should be scheduled
g.Expect(numSchedulable).To(Equal(2))
}).Should(Succeed())
})

It("Should more Pods be scheduled if images are present in a node, not limited to the number of Pods.", func() {
testName := "already-images-in-node"
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: testName,
},
}
err := k8sClient.Create(ctx, namespace)
Expect(err).NotTo(HaveOccurred())

for i := 0; i < 10; i++ {
createNewNode(testName, i)
}

nodes := &corev1.NodeList{}
err = k8sClient.List(ctx, nodes)
Expect(err).NotTo(HaveOccurred())
for _, node := range nodes.Items {
updateNodeImageStatus(&node, []corev1.Container{
{Image: fmt.Sprintf("%s.example.com/sample1-image:1.0.0", testName)},
{Image: fmt.Sprintf("%s.example.com/sample2-image:1.0.0", testName)},
})
}

for i := 0; i < 10; i++ {
createNewPod(testName, i)
}

pods := &corev1.PodList{}
Eventually(func(g Gomega) {
err := k8sClient.List(ctx, pods, &client.ListOptions{Namespace: testName})
g.Expect(err).NotTo(HaveOccurred())
numSchedulable := 0
for _, pod := range pods.Items {
if !existsSchedulingGate(&pod) {
numSchedulable += 1
}
}
// image is present in the node, so 10 pods should be scheduled
g.Expect(numSchedulable).To(Equal(10))
}).Should(Succeed())
})
})
Expand Down Expand Up @@ -423,7 +465,6 @@ func scheduleAndStartOneUnhealthyPod(namespace string) {
for _, pod := range pods.Items {
if !existsSchedulingGate(&pod) && len(pod.Status.ContainerStatuses) == 0 {
updatePodStatus(&pod, corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{Reason: "RunContainerError"}})

node := &corev1.Node{}
err = k8sClient.Get(ctx, client.ObjectKey{Name: pod.Status.HostIP}, node)
Expect(err).NotTo(HaveOccurred())
Expand Down
44 changes: 44 additions & 0 deletions internal/runners/garbage_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package runners

import (
"context"
"time"

"github.com/cybozu-go/cat-gate/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
)

const levelDebug = -1
const gcIntervalHours = 24

var historyDeletionDuration = 24 * 60 * 60 // 1 day

type GarbageCollector struct {
}

func (gc GarbageCollector) NeedLeaderElection() bool {
return true
}

func (gc GarbageCollector) Start(ctx context.Context) error {
ticker := time.NewTicker(time.Hour * gcIntervalHours)
defer ticker.Stop()
logger := log.FromContext(ctx)

for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
controller.GateRemovalHistories.Range(func(imageHash, value interface{}) bool {
lastGateRemovalTime := time.UnixMilli(value.(int64))
// Delete history that has not been updated for a long time to prevent memory leaks.
if time.Since(lastGateRemovalTime) > time.Second*time.Duration(historyDeletionDuration) {
logger.V(levelDebug).Info("delete old history", "image hash", imageHash, "lastGateRemovalTime", lastGateRemovalTime)
controller.GateRemovalHistories.Delete(imageHash)
}
return true
})
}
}
}

0 comments on commit f4d12f8

Please sign in to comment.