diff --git a/internal/controller/mantlebackup_controller_test.go b/internal/controller/mantlebackup_controller_test.go index 3fe8f183..e59007e4 100644 --- a/internal/controller/mantlebackup_controller_test.go +++ b/internal/controller/mantlebackup_controller_test.go @@ -22,10 +22,12 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "k8s.io/kube-openapi/pkg/validation/strfmt" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" @@ -1129,21 +1131,25 @@ var _ = Describe("export", func() { var mockCtrl *gomock.Controller var grpcClient *proto.MockMantleServiceClient var mbr *MantleBackupReconciler - var ns string + var nsController, ns string + var dummyPVCManifest, dummyPVManifest []byte BeforeEach(func() { var t reporter mockCtrl = gomock.NewController(t) grpcClient = proto.NewMockMantleServiceClient(mockCtrl) + nsController = resMgr.CreateNamespace() + mbr = NewMantleBackupReconciler( k8sClient, scheme.Scheme, - resMgr.ClusterID, + nsController, RolePrimary, &PrimarySettings{ Client: grpcClient, ExportDataStorageClass: resMgr.StorageClassName, + MaxExportJobs: 1, }, "dummy image", "", @@ -1151,15 +1157,9 @@ var _ = Describe("export", func() { ) ns = resMgr.CreateNamespace() - }) - AfterEach(func() { - if mockCtrl != nil { - mockCtrl.Finish() - } - }) + var err error - It("should set correct annotations after export() is called", func(ctx SpecContext) { pvc := corev1.PersistentVolumeClaim{ Spec: corev1.PersistentVolumeClaimSpec{ Resources: corev1.VolumeResourceRequirements{ @@ -1169,7 +1169,7 @@ var _ = Describe("export", func() { }, }, } - pvcManifest, err := json.Marshal(pvc) + dummyPVCManifest, err = json.Marshal(pvc) Expect(err).NotTo(HaveOccurred()) pv := corev1.PersistentVolume{ @@ -1184,8 +1184,18 @@ var _ = Describe("export", func() { }, }, } - pvManifest, err := json.Marshal(pv) + dummyPVManifest, err = json.Marshal(pv) Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + if mockCtrl != nil { + mockCtrl.Finish() + } + }) + + It("should set correct annotations after export() is called", func(ctx SpecContext) { + var err error // test a full backup target := &mantlev1.MantleBackup{ @@ -1201,8 +1211,8 @@ var _ = Describe("export", func() { err = k8sClient.Create(ctx, target) Expect(err).NotTo(HaveOccurred()) err = updateStatus(ctx, k8sClient, target, func() error { - target.Status.PVManifest = string(pvManifest) - target.Status.PVCManifest = string(pvcManifest) + target.Status.PVManifest = string(dummyPVManifest) + target.Status.PVCManifest = string(dummyPVCManifest) return nil }) Expect(err).NotTo(HaveOccurred()) @@ -1237,9 +1247,9 @@ var _ = Describe("export", func() { } err = k8sClient.Create(ctx, target2) Expect(err).NotTo(HaveOccurred()) - err = updateStatus(ctx, k8sClient, target2, func() error { - target2.Status.PVManifest = string(pvManifest) - target2.Status.PVCManifest = string(pvcManifest) + err = updateStatus(context.Background(), k8sClient, target2, func() error { + target2.Status.PVManifest = string(dummyPVManifest) + target2.Status.PVCManifest = string(dummyPVCManifest) return nil }) Expect(err).NotTo(HaveOccurred()) @@ -1269,4 +1279,89 @@ var _ = Describe("export", func() { Expect(ok).To(BeTrue()) Expect(diffFrom).To(Equal(target.GetName())) }) + + It("should throttle export jobs correctly", func(ctx SpecContext) { + var err error + + getNumOfExportJobs := func(ns string) (int, error) { + var jobs batchv1.JobList + err := k8sClient.List(ctx, &jobs, &client.ListOptions{ + Namespace: ns, + LabelSelector: labels.SelectorFromSet(map[string]string{ + "app.kubernetes.io/name": labelAppNameValue, + "app.kubernetes.io/component": labelComponentExportJob, + }), + }) + return len(jobs.Items), err + } + + createAndExportMantleBackup := func(mbr *MantleBackupReconciler, name, ns string) { + target := &mantlev1.MantleBackup{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: mantlev1.MantleBackupSpec{ + PVC: "dummy", + Expire: "1d", + }, + } + err = k8sClient.Create(ctx, target) + Expect(err).NotTo(HaveOccurred()) + + err = updateStatus(ctx, k8sClient, target, func() error { + target.Status.PVManifest = string(dummyPVManifest) + target.Status.PVCManifest = string(dummyPVCManifest) + return nil + }) + Expect(err).NotTo(HaveOccurred()) + + grpcClient.EXPECT().SetSynchronizing(gomock.Any(), gomock.Any()). + Times(1).Return(&proto.SetSynchronizingResponse{}, nil) + + _, err = mbr.export(ctx, target, &dataSyncPrepareResult{ + isIncremental: false, + isSecondaryMantleBackupReadyToUse: false, + diffFrom: nil, + }) + Expect(err).NotTo(HaveOccurred()) + } + + // create 5 different MantleBackup resources and call export() for each of them + for i := 0; i < 5; i++ { + createAndExportMantleBackup(mbr, fmt.Sprintf("target1-%d", i), ns) + } + + // make sure that only 1 Job is created + Consistently(ctx, func(g Gomega) error { + numJobs, err := getNumOfExportJobs(nsController) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(numJobs).To(Equal(1)) + return nil + }, "1s").Should(Succeed()) + + // make sure that another mantle-controller existing in a different namespace can create an export Job. + ns2 := resMgr.CreateNamespace() + mbr2 := NewMantleBackupReconciler( + k8sClient, + scheme.Scheme, + ns2, + RolePrimary, + &PrimarySettings{ + Client: grpcClient, + ExportDataStorageClass: resMgr.StorageClassName, + MaxExportJobs: 1, + }, + "dummy image", + "", + nil, + ) + createAndExportMantleBackup(mbr2, "target2", ns2) + Eventually(ctx, func(g Gomega) error { + numJobs, err := getNumOfExportJobs(ns2) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(numJobs).To(Equal(1)) + return nil + }).Should(Succeed()) + }) })