diff --git a/.github/workflows/e2e-multiple-k8s-clusters.yaml b/.github/workflows/e2e-multiple-k8s-clusters.yaml index 7ee443f4..ae1ffd2d 100644 --- a/.github/workflows/e2e-multiple-k8s-clusters.yaml +++ b/.github/workflows/e2e-multiple-k8s-clusters.yaml @@ -19,6 +19,14 @@ concurrency: jobs: build: + strategy: + fail-fast: false + matrix: + package: + - replication + - changetostandalone + - changetoprimary + - changetosecondary #runs-on: "ubuntu-22.04" runs-on: mantle_large_runner_16core steps: @@ -29,4 +37,4 @@ jobs: - run: sudo apt-get update - uses: ./.github/actions/set-up-kvm-for-e2e-tests - run: make -C test/e2e setup - - run: make -C test/e2e test-multiple-k8s-clusters + - run: make -C test/e2e test-multiple-k8s-clusters TEST_MULTIK8S_PACKAGES=${{ matrix.package }} diff --git a/charts/mantle/templates/deployment.yaml b/charts/mantle/templates/deployment.yaml index 9fa3a7af..44305a01 100644 --- a/charts/mantle/templates/deployment.yaml +++ b/charts/mantle/templates/deployment.yaml @@ -108,6 +108,9 @@ spec: fieldPath: metadata.namespace - name: POD_IMAGE value: {{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }} + {{- range .Values.controller.env }} + - {{ toYaml . | nindent 14 }} + {{- end }} ports: {{- toYaml .Values.controller.ports | nindent 12 }} - command: diff --git a/charts/mantle/values.yaml b/charts/mantle/values.yaml index d08eaa5a..df23cd0d 100644 --- a/charts/mantle/values.yaml +++ b/charts/mantle/values.yaml @@ -25,6 +25,7 @@ affinity: {} controller: role: standalone ports: [] + env: [] secondaryService: # type: diff --git a/internal/controller/mantlebackup_controller.go b/internal/controller/mantlebackup_controller.go index c1dde3b8..92e602f7 100644 --- a/internal/controller/mantlebackup_controller.go +++ b/internal/controller/mantlebackup_controller.go @@ -262,7 +262,7 @@ func (r *MantleBackupReconciler) getSnapshotTarget(ctx context.Context, backup * } if !ok { logger.Info("waiting for PVC bound.") - return nil, ctrl.Result{Requeue: true}, nil + return nil, requeueReconciliation(), nil } pvName := pvc.Spec.VolumeName @@ -392,7 +392,7 @@ func (r *MantleBackupReconciler) reconcileAsStandalone(ctx context.Context, back default: return ctrl.Result{}, getSnapshotTargetErr } - if result.Requeue { + if !result.IsZero() { return result, nil } @@ -415,7 +415,7 @@ func (r *MantleBackupReconciler) reconcileAsStandalone(ctx context.Context, back if err != nil { return ctrl.Result{}, err } - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } if err := r.expire(ctx, backup); err != nil { @@ -461,7 +461,7 @@ func (r *MantleBackupReconciler) reconcileAsSecondary(ctx context.Context, backu default: return ctrl.Result{}, getSnapshotTargetErr } - if result.Requeue { + if !result.IsZero() { return result, nil } @@ -564,7 +564,7 @@ func (r *MantleBackupReconciler) replicateManifests( *backup1.Status.SnapID < *backup.Status.SnapID && backup1.ObjectMeta.DeletionTimestamp.IsZero() && !meta.IsStatusConditionTrue(backup1.Status.Conditions, mantlev1.BackupConditionSyncedToRemote) { - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } } @@ -703,7 +703,7 @@ func (r *MantleBackupReconciler) finalizeStandalone( ) (ctrl.Result, error) { logger := log.FromContext(ctx) if _, ok := backup.GetAnnotations()[annotDiffTo]; ok { - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } if !controllerutil.ContainsFinalizer(backup, MantleBackupFinalizerName) { @@ -741,7 +741,7 @@ func (r *MantleBackupReconciler) finalizeSecondary( ) (ctrl.Result, error) { logger := log.FromContext(ctx) if _, ok := backup.GetAnnotations()[annotDiffTo]; ok { - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } if !controllerutil.ContainsFinalizer(backup, MantleBackupFinalizerName) { @@ -965,7 +965,7 @@ func (r *MantleBackupReconciler) export( return ctrl.Result{}, err } - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } func (r *MantleBackupReconciler) annotateExportTargetMantleBackup( @@ -1025,7 +1025,7 @@ func (r *MantleBackupReconciler) checkIfNewJobCanBeCreated(ctx context.Context) } if len(jobs.Items) >= r.primarySettings.MaxExportJobs { - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } return ctrl.Result{}, nil @@ -1271,7 +1271,7 @@ func (r *MantleBackupReconciler) checkIfExportJobIsCompleted( return ctrl.Result{}, nil } - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } func (r *MantleBackupReconciler) createOrUpdateExportDataUploadJob(ctx context.Context, target *mantlev1.MantleBackup) error { @@ -1422,7 +1422,7 @@ func (r *MantleBackupReconciler) startImport( ) (ctrl.Result, error) { if !r.doesMantleBackupHaveSyncModeAnnot(backup) { // SetSynchronizingg is not called yet or the cache is stale. - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } if result, err := r.isExportDataAlreadyUploaded(ctx, backup); err != nil || !result.IsZero() { @@ -1431,7 +1431,7 @@ func (r *MantleBackupReconciler) startImport( // Requeue if the PV is smaller than the PVC. (This may be the case if pvc-autoresizer is used.) if isPVSmallerThanPVC(target.pv, target.pvc) { - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } if err := r.updateStatusManifests(ctx, backup, target.pv, target.pvc); err != nil { @@ -1528,7 +1528,7 @@ func (r *MantleBackupReconciler) isExportDataAlreadyUploaded( if uploaded { return ctrl.Result{}, nil } - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } func isPVSmallerThanPVC( @@ -1592,7 +1592,7 @@ func (r *MantleBackupReconciler) reconcileDiscardJob( if completed { return ctrl.Result{}, nil } - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } func (r *MantleBackupReconciler) createOrUpdateDiscardPV( @@ -1777,11 +1777,11 @@ func (r *MantleBackupReconciler) reconcileImportJob( if err := r.createOrImportJob(ctx, backup, snapshotTarget); err != nil { return ctrl.Result{}, err } - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } if !IsJobConditionTrue(job.Status.Conditions, batchv1.JobComplete) { - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } snapshot, err := ceph.FindRBDSnapshot( diff --git a/internal/controller/mantlerestore_controller.go b/internal/controller/mantlerestore_controller.go index 15d8a997..b4731c59 100644 --- a/internal/controller/mantlerestore_controller.go +++ b/internal/controller/mantlerestore_controller.go @@ -137,7 +137,7 @@ func (r *MantleRestoreReconciler) restore(ctx context.Context, restore *mantlev1 // check if the backup is ReadyToUse if !meta.IsStatusConditionTrue(backup.Status.Conditions, mantlev1.BackupConditionReadyToUse) { logger.Info("backup is not ready to use", "backup", backup.Name, "namespace", backup.Namespace) - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } // store the pool name in the status diff --git a/internal/controller/persistentvolume_controller.go b/internal/controller/persistentvolume_controller.go index b12a8366..5e83424f 100644 --- a/internal/controller/persistentvolume_controller.go +++ b/internal/controller/persistentvolume_controller.go @@ -89,7 +89,7 @@ func (r *PersistentVolumeReconciler) Reconcile(ctx context.Context, req ctrl.Req // Wait until the PV's status becomes Released. if pv.Status.Phase != corev1.VolumeReleased { - return ctrl.Result{Requeue: true}, nil + return requeueReconciliation(), nil } // Delete the RBD clone image. diff --git a/internal/controller/util.go b/internal/controller/util.go index 3fa2d75c..1e360d06 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -4,12 +4,15 @@ import ( "context" "errors" "fmt" + "os" "strings" + "time" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -81,3 +84,15 @@ func IsJobConditionTrue(conditions []batchv1.JobCondition, conditionType batchv1 } return false } + +func requeueReconciliation() ctrl.Result { + requeueAfter := os.Getenv("REQUEUE_RECONCILIATION_AFTER") + if requeueAfter != "" { + duration, err := time.ParseDuration(requeueAfter) + if err != nil { + panic(fmt.Sprintf("set REQUEUE_RECONCILIATION_AFTER properly: %v", err)) + } + return ctrl.Result{RequeueAfter: duration} + } + return ctrl.Result{Requeue: true} +} diff --git a/test/e2e/Makefile b/test/e2e/Makefile index e1d817ff..87364559 100644 --- a/test/e2e/Makefile +++ b/test/e2e/Makefile @@ -25,6 +25,7 @@ LOOP_DEV2 := /dev/loop1 MINIKUBE_PROFILE_PRIMARY=profile1 MINIKUBE_PROFILE_SECONDARY=profile2 TMPDIR := tmp +TEST_MULTIK8S_PACKAGES := replication changetostandalone changetoprimary changetosecondary export MINIKUBE_HOME @@ -240,4 +241,4 @@ do-test-multik8s: $(GINKGO) E2ETEST=1 \ KUBECTL_PRIMARY="$(MINIKUBE) -p $(MINIKUBE_PROFILE_PRIMARY) kubectl -- " \ KUBECTL_SECONDARY="$(MINIKUBE) -p $(MINIKUBE_PROFILE_SECONDARY) kubectl -- " \ - $(GINKGO) --fail-fast -v $(GINKGO_FLAGS) multik8s + $(GINKGO) --fail-fast -v $(GINKGO_FLAGS) $(addprefix multik8s/, $(TEST_MULTIK8S_PACKAGES)) diff --git a/test/e2e/multik8s/changetoprimary/suite_test.go b/test/e2e/multik8s/changetoprimary/suite_test.go new file mode 100644 index 00000000..c406517e --- /dev/null +++ b/test/e2e/multik8s/changetoprimary/suite_test.go @@ -0,0 +1,122 @@ +package changetoprimary + +import ( + "context" + "os" + "testing" + "time" + + "github.com/cybozu-go/mantle/internal/controller" + . "github.com/cybozu-go/mantle/test/e2e/multik8s/testutil" + "github.com/cybozu-go/mantle/test/util" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestMtest(t *testing.T) { + if os.Getenv("E2ETEST") == "" { + t.Skip("Run under e2e/") + } + + RegisterFailHandler(Fail) + + SetDefaultEventuallyPollingInterval(time.Second) + SetDefaultEventuallyTimeout(3 * time.Minute) + EnforceDefaultTimeoutsWhenUsingContexts() + + RunSpecs(t, "replication test with multiple k8s clusters") +} + +var _ = Describe("Mantle", func() { + Context("wait controller to be ready", WaitControllerToBeReady) + Context("change to primary", changePrimaryToStandaloneTemporarily) +}) + +func changePrimaryToStandaloneTemporarily() { + Describe("change to primary", func() { + var ( + namespace string + pvcName0, backupName00, backupName01, writtenDataHash00, writtenDataHash01 string + pvcName1, backupName10, writtenDataHash10 string + ) + + /* + Overview of the test: + + primary k8s cluster | secondary k8s cluster + ============================|========================== + role=primary | role=secondary + PVC0, MB00 (created) | + | PVC0, MB00 (synced) + role=standalone (changed) | + MB01, PVC1, MB10 (created)| + role=primary (changed) | + | MB01, PVC1, MB10 (synced) + */ + + It("should replicate a MantleBackup resource", func(ctx context.Context) { + namespace = util.GetUniqueName("ns-") + pvcName0 = util.GetUniqueName("pvc-") + backupName00 = util.GetUniqueName("mb-") + + SetupEnvironment(namespace) + CreatePVC(ctx, PrimaryK8sCluster, namespace, pvcName0) + writtenDataHash00 = WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName0) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName0, backupName00) + WaitMantleBackupSynced(namespace, backupName00) + }) + + It("should change the role from primary to standalone", func() { + By("changing the primary mantle to standalone") + err := ChangeClusterRole(PrimaryK8sCluster, controller.RoleStandalone) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should restore the synced MantleBackup in the both clusters", func(ctx context.Context) { + restoreName00 := util.GetUniqueName("mr-") + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName00, restoreName00, writtenDataHash00) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName00, restoreName00, writtenDataHash00) + }) + + It("should create a MantleBackup resource", func(ctx SpecContext) { + backupName01 = util.GetUniqueName("mb-") + writtenDataHash01 = WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName0) + + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName0, backupName01) + + pvcName1 = util.GetUniqueName("pvc-") + backupName10 = util.GetUniqueName("mb-") + + Eventually(func() error { + return ApplyPVCTemplate(PrimaryK8sCluster, namespace, pvcName1) + }).Should(Succeed()) + writtenDataHash10 = WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName1) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName1, backupName10) + }) + + It("should change the role from standalone to primary", func() { + By("changing the standalone mantle to primary") + err := ChangeClusterRole(PrimaryK8sCluster, controller.RolePrimary) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should synchronize MantleBackups correctly", func() { + WaitMantleBackupSynced(namespace, backupName01) + WaitMantleBackupSynced(namespace, backupName10) + }) + + It("should restore MantleBackups correctly", func(ctx SpecContext) { + restoreName00 := util.GetUniqueName("mr-") + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName00, restoreName00, writtenDataHash00) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName00, restoreName00, writtenDataHash00) + + restoreName01 := util.GetUniqueName("mr-") + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName01, restoreName01, writtenDataHash01) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName01, restoreName01, writtenDataHash01) + + restoreName10 := util.GetUniqueName("mr-") + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName10, restoreName10, writtenDataHash10) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName10, restoreName10, writtenDataHash10) + }) + }) +} diff --git a/test/e2e/multik8s/changetosecondary/suite_test.go b/test/e2e/multik8s/changetosecondary/suite_test.go new file mode 100644 index 00000000..f8874954 --- /dev/null +++ b/test/e2e/multik8s/changetosecondary/suite_test.go @@ -0,0 +1,152 @@ +package changetosecondary + +import ( + "context" + "os" + "testing" + "time" + + "github.com/cybozu-go/mantle/internal/controller" + . "github.com/cybozu-go/mantle/test/e2e/multik8s/testutil" + "github.com/cybozu-go/mantle/test/util" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestMtest(t *testing.T) { + if os.Getenv("E2ETEST") == "" { + t.Skip("Run under e2e/") + } + + RegisterFailHandler(Fail) + + SetDefaultEventuallyPollingInterval(time.Second) + SetDefaultEventuallyTimeout(3 * time.Minute) + EnforceDefaultTimeoutsWhenUsingContexts() + + RunSpecs(t, "replication test with multiple k8s clusters") +} + +var _ = Describe("Mantle", func() { + Context("wait controller to be ready", WaitControllerToBeReady) + Context("change to secondary", changeSecondaryToStandaloneTemporarily) +}) + +func changeSecondaryToStandaloneTemporarily() { + Describe("change to secondary", func() { + var ( + namespace string + pvcName0, backupName00, writtenDataHash00 string + pvcName1, backupName10, writtenDataHash10 string + pvcName2, backupName20, backupName21, writtenDataHash20, writtenDataHash21 string + ) + + /* + Overview of the test: + + primary k8s cluster | secondary k8s cluster + ===========================|========================== + role=primary | role=secondary + PVC0, MB00 (created) | + | PVC0, MB00 (synced) + | role=standalone (changed) + | PVC1, MB10 (created) + (MB10 don't exist) | + | role=secondary (changed) + PVC2, MB20 (created) | + | PVC2, MB20 (synced) + MB21 (created) | + | MB21 (synced) + (MB10 don't exist) | + */ + + It("should set up the environment", func(ctx context.Context) { + namespace = util.GetUniqueName("ns-") + pvcName0 = util.GetUniqueName("pvc-") + backupName00 = util.GetUniqueName("mb-") + pvcName1 = util.GetUniqueName("pvc-") + backupName10 = util.GetUniqueName("mb-") + pvcName2 = util.GetUniqueName("pvc-") + backupName20 = util.GetUniqueName("mb-") + backupName21 = util.GetUniqueName("mb-") + + SetupEnvironment(namespace) + }) + + It("should create and restore a MantleBackup resource", func(ctx context.Context) { + CreatePVC(ctx, PrimaryK8sCluster, namespace, pvcName0) + + writtenDataHash00 = WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName0) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName0, backupName00) + WaitMantleBackupSynced(namespace, backupName00) + + restoreName00 := util.GetUniqueName("mr-") + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName00, restoreName00, writtenDataHash00) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName00, restoreName00, writtenDataHash00) + }) + + It("should change the role from secondary to standalone", func() { + By("changing the secondary mantle to standalone") + err := ChangeClusterRole(SecondaryK8sCluster, controller.RoleStandalone) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should restore the synced MantleBackup in the both clusters", func(ctx context.Context) { + restoreName00 := util.GetUniqueName("mr-") + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName00, restoreName00, writtenDataHash00) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName00, restoreName00, writtenDataHash00) + }) + + It("should create a MantleBackup resource in the secondary k8s cluster", func(ctx context.Context) { + CreatePVC(ctx, SecondaryK8sCluster, namespace, pvcName1) + writtenDataHash10 = WriteRandomDataToPV(ctx, SecondaryK8sCluster, namespace, pvcName1) + CreateMantleBackup(SecondaryK8sCluster, namespace, pvcName1, backupName10) + WaitMantleBackupReadyToUse(SecondaryK8sCluster, namespace, backupName10) + }) + + It("should ensure the MantleBackup created by standalone mantle doesn't exist in the primary k8s cluster", + func(ctx context.Context) { + EnsureMantleBackupNotExist(ctx, PrimaryK8sCluster, namespace, backupName10) + }) + + It("should change the role from standalone to secondary", func() { + By("changing the standalone mantle to secondary") + err := ChangeClusterRole(SecondaryK8sCluster, controller.RoleSecondary) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should create and synchronize new MantleBackup resources", func(ctx SpecContext) { + CreatePVC(ctx, PrimaryK8sCluster, namespace, pvcName2) + + writtenDataHash20 = WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName2) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName2, backupName20) + WaitMantleBackupSynced(namespace, backupName20) + + writtenDataHash21 = WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName2) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName2, backupName21) + WaitMantleBackupSynced(namespace, backupName21) + }) + + It("should restore MantleBackups correctly", func(ctx SpecContext) { + restoreName00 := util.GetUniqueName("mr-") + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName00, restoreName00, writtenDataHash00) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName00, restoreName00, writtenDataHash00) + + restoreName10 := util.GetUniqueName("mr-") + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName10, restoreName10, writtenDataHash10) + + restoreName20 := util.GetUniqueName("mr-") + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName20, restoreName20, writtenDataHash20) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName20, restoreName20, writtenDataHash20) + + restoreName21 := util.GetUniqueName("mr-") + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName21, restoreName21, writtenDataHash21) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName21, restoreName21, writtenDataHash21) + }) + + It("should ensure the MantleBackup created by standalone mantle doesn't exist in the primary k8s cluster", + func(ctx context.Context) { + EnsureMantleBackupNotExist(ctx, PrimaryK8sCluster, namespace, backupName10) + }) + }) +} diff --git a/test/e2e/multik8s/changetostandalone/suite_test.go b/test/e2e/multik8s/changetostandalone/suite_test.go new file mode 100644 index 00000000..5369971a --- /dev/null +++ b/test/e2e/multik8s/changetostandalone/suite_test.go @@ -0,0 +1,114 @@ +package changetostandalone + +import ( + "context" + "encoding/json" + "os" + "testing" + "time" + + mantlev1 "github.com/cybozu-go/mantle/api/v1" + "github.com/cybozu-go/mantle/internal/controller" + . "github.com/cybozu-go/mantle/test/e2e/multik8s/testutil" + "github.com/cybozu-go/mantle/test/util" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestMtest(t *testing.T) { + if os.Getenv("E2ETEST") == "" { + t.Skip("Run under e2e/") + } + + RegisterFailHandler(Fail) + + SetDefaultEventuallyPollingInterval(time.Second) + SetDefaultEventuallyTimeout(3 * time.Minute) + EnforceDefaultTimeoutsWhenUsingContexts() + + RunSpecs(t, "replication test with multiple k8s clusters") +} + +var _ = Describe("Mantle", func() { + Context("wait controller to be ready", WaitControllerToBeReady) + Context("change to standalone", changeToStandalone) +}) + +func changeToStandalone() { + Describe("change to standalone", func() { + var namespace, pvcName, backupName string + + It("should replicate a MantleBackup resource", func(ctx context.Context) { + namespace = util.GetUniqueName("ns-") + pvcName = util.GetUniqueName("pvc-") + backupName = util.GetUniqueName("mb-") + + SetupEnvironment(namespace) + CreatePVC(ctx, PrimaryK8sCluster, namespace, pvcName) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName) + WaitMantleBackupSynced(namespace, backupName) + }) + + It("should change the roles to standalone", func() { + By("changing the primary mantle to standalone") + err := ChangeClusterRole(PrimaryK8sCluster, controller.RoleStandalone) + Expect(err).NotTo(HaveOccurred()) + By("changing the secondary mantle to standalone") + err = ChangeClusterRole(SecondaryK8sCluster, controller.RoleStandalone) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should delete MantleBackup created by primary mantle from standalone mantle", func(ctx SpecContext) { + By("deleting the MantleBackup in the primary cluster") + _, _, err := Kubectl(PrimaryK8sCluster, nil, "delete", "mb", "-n", namespace, backupName, "--wait=false") + Expect(err).NotTo(HaveOccurred()) + + By("checking that the MantleBackup is actually deleted") + Eventually(ctx, func(g Gomega) { + stdout, _, err := Kubectl(PrimaryK8sCluster, nil, "get", "mb", "-n", namespace, "-o", "json") + g.Expect(err).NotTo(HaveOccurred()) + var mbs mantlev1.MantleBackupList + err = json.Unmarshal(stdout, &mbs) + g.Expect(err).NotTo(HaveOccurred()) + found := false + for _, mb := range mbs.Items { + if mb.GetName() == backupName { + found = true + } + } + g.Expect(found).To(BeFalse()) + }).Should(Succeed()) + }) + + It("should NOT delete MantleBackup created by secondary mantle from standalone mantle", func(ctx SpecContext) { + By("deleting the MantleBackup in the secondary cluster") + _, _, err := Kubectl(SecondaryK8sCluster, nil, "delete", "mb", "-n", namespace, backupName, "--wait=false") + Expect(err).NotTo(HaveOccurred()) + + By("checking that the MantleBackup is NOT deleted") + Consistently(ctx, func(g Gomega) { + stdout, _, err := Kubectl(SecondaryK8sCluster, nil, "get", "mb", "-n", namespace, "-o", "json") + g.Expect(err).NotTo(HaveOccurred()) + var mbs mantlev1.MantleBackupList + err = json.Unmarshal(stdout, &mbs) + g.Expect(err).NotTo(HaveOccurred()) + found := false + for _, mb := range mbs.Items { + if mb.GetName() == backupName { + found = true + } + } + g.Expect(found).To(BeTrue()) + }, "10s", "1s").Should(Succeed()) + }) + + It("should change their roles back to primary/secondary", func() { + By("reverting the standalone mantle back to primary in the primary K8s cluster") + err := ChangeClusterRole(PrimaryK8sCluster, controller.RolePrimary) + Expect(err).NotTo(HaveOccurred()) + By("reverting the standalone mantle back to secondary in the secondary K8s cluster") + err = ChangeClusterRole(SecondaryK8sCluster, controller.RoleSecondary) + Expect(err).NotTo(HaveOccurred()) + }) + }) +} diff --git a/test/e2e/multik8s/replication/suite_test.go b/test/e2e/multik8s/replication/suite_test.go new file mode 100644 index 00000000..cb8b9edf --- /dev/null +++ b/test/e2e/multik8s/replication/suite_test.go @@ -0,0 +1,329 @@ +package replication + +import ( + "errors" + "os" + "reflect" + "slices" + "testing" + "time" + + mantlev1 "github.com/cybozu-go/mantle/api/v1" + . "github.com/cybozu-go/mantle/test/e2e/multik8s/testutil" + "github.com/cybozu-go/mantle/test/util" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +func TestMtest(t *testing.T) { + if os.Getenv("E2ETEST") == "" { + t.Skip("Run under e2e/") + } + + RegisterFailHandler(Fail) + + SetDefaultEventuallyPollingInterval(time.Second) + SetDefaultEventuallyTimeout(3 * time.Minute) + EnforceDefaultTimeoutsWhenUsingContexts() + + RunSpecs(t, "replication test with multiple k8s clusters") +} + +var _ = Describe("Mantle", func() { + Context("wait controller to be ready", WaitControllerToBeReady) + Context("replication test", replicationTestSuite) +}) + +func replicationTestSuite() { + Describe("replication test", func() { + It("should correctly replicate PVC and MantleBackup resources", func(ctx SpecContext) { + namespace := util.GetUniqueName("ns-") + pvcName := util.GetUniqueName("pvc-") + backupName := util.GetUniqueName("mb-") + restoreName := util.GetUniqueName("mr-") + + SetupEnvironment(namespace) + CreatePVC(ctx, PrimaryK8sCluster, namespace, pvcName) + writtenDataHash := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName) + WaitMantleBackupSynced(namespace, backupName) + + By("checking PVC is replicated") + Eventually(func() error { + primaryPVC, err := GetPVC(PrimaryK8sCluster, namespace, pvcName) + if err != nil { + return err + } + + pvc, err := GetPVC(SecondaryK8sCluster, namespace, pvcName) + if err != nil { + return err + } + if pvc.Annotations == nil || + pvc.Annotations["mantle.cybozu.io/remote-uid"] != string(primaryPVC.GetUID()) { + return errors.New("invalid remote-uid annotation") + } + primaryPVC.Spec.VolumeName = "" + pvc.Spec.VolumeName = "" + if !reflect.DeepEqual(primaryPVC.Spec, pvc.Spec) { + return errors.New("spec not equal") + } + if pvc.Status.Phase != corev1.ClaimBound { + return errors.New("pvc not bound") + } + + return nil + }).Should(Succeed()) + + By("checking MantleBackup is replicated") + Eventually(func() error { + primaryPVC, err := GetPVC(PrimaryK8sCluster, namespace, pvcName) + if err != nil { + return err + } + secondaryPVC, err := GetPVC(SecondaryK8sCluster, namespace, pvcName) + if err != nil { + return err + } + primaryMB, err := GetMB(PrimaryK8sCluster, namespace, backupName) + if err != nil { + return err + } + + secondaryMB, err := GetMB(SecondaryK8sCluster, namespace, backupName) + if err != nil { + return err + } + if !controllerutil.ContainsFinalizer(secondaryMB, "mantlebackup.mantle.cybozu.io/finalizer") { + return errors.New("finalizer not found") + } + if secondaryMB.Labels == nil || + secondaryMB.Labels["mantle.cybozu.io/local-backup-target-pvc-uid"] != string(secondaryPVC.GetUID()) || + secondaryMB.Labels["mantle.cybozu.io/remote-backup-target-pvc-uid"] != string(primaryPVC.GetUID()) { + return errors.New("local/remote-backup-target-pvc-uid label not matched") + } + if secondaryMB.Annotations == nil || + secondaryMB.Annotations["mantle.cybozu.io/remote-uid"] != string(primaryMB.GetUID()) { + return errors.New("remote-uid not matched") + } + if !reflect.DeepEqual(primaryMB.Spec, secondaryMB.Spec) { + return errors.New("spec not equal") + } + if secondaryMB.Status.CreatedAt.IsZero() { + return errors.New(".Status.CreatedAt is zero") + } + if !meta.IsStatusConditionTrue(secondaryMB.Status.Conditions, "ReadyToUse") { + return errors.New("ReadyToUse of .Status.Conditions is not True") + } + + return nil + }).Should(Succeed()) + + EnsureTemporaryResourcesDeleted(ctx) + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName, restoreName, writtenDataHash) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName, restoreName, writtenDataHash) + }) + + //nolint:dupl + It("should back up correctly if previous MB is deleted in the secondary cluster", func(ctx SpecContext) { + namespace := util.GetUniqueName("ns-") + pvcName := util.GetUniqueName("pvc-") + backupName0 := util.GetUniqueName("mb-") + backupName1 := util.GetUniqueName("mb-") + restoreName0 := util.GetUniqueName("mr-") + restoreName1 := util.GetUniqueName("mr-") + + SetupEnvironment(namespace) + CreatePVC(ctx, PrimaryK8sCluster, namespace, pvcName) + writtenDataHash0 := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + + // create M0. + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName0) + WaitMantleBackupSynced(namespace, backupName0) + + // remove M0'. + _, _, err := Kubectl(SecondaryK8sCluster, nil, "delete", "mb", "-n", namespace, backupName0) + Expect(err).NotTo(HaveOccurred()) + + // create M1. + writtenDataHash1 := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName1) + WaitMantleBackupSynced(namespace, backupName1) + EnsureTemporaryResourcesDeleted(ctx) + + // Make sure M1 and M1' have the same contents. + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) + + // Make sure M0 can be used for restoration. + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) + }) + + //nolint:dupl + It("should back up correctly if previous MB is deleted in the primary cluster", func(ctx SpecContext) { + namespace := util.GetUniqueName("ns-") + pvcName := util.GetUniqueName("pvc-") + backupName0 := util.GetUniqueName("mb-") + backupName1 := util.GetUniqueName("mb-") + restoreName0 := util.GetUniqueName("mr-") + restoreName1 := util.GetUniqueName("mr-") + + SetupEnvironment(namespace) + CreatePVC(ctx, PrimaryK8sCluster, namespace, pvcName) + writtenDataHash0 := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + + // create M0. + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName0) + WaitMantleBackupSynced(namespace, backupName0) + + // remove M0. + _, _, err := Kubectl(PrimaryK8sCluster, nil, "delete", "mb", "-n", namespace, backupName0) + Expect(err).NotTo(HaveOccurred()) + + // create M1. + writtenDataHash1 := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName1) + WaitMantleBackupSynced(namespace, backupName1) + EnsureTemporaryResourcesDeleted(ctx) + + // Make sure M1 and M1' have the same contents. + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) + + // Make sure M0' can be used for restoration. + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) + }) + + It("should perform a correct incremental backup", func(ctx SpecContext) { + namespace := util.GetUniqueName("ns-") + pvcName := util.GetUniqueName("pvc-") + backupName0 := util.GetUniqueName("mb-") + backupName1 := util.GetUniqueName("mb-") + restoreName0 := util.GetUniqueName("mr-") + restoreName1 := util.GetUniqueName("mr-") + + SetupEnvironment(namespace) + CreatePVC(ctx, PrimaryK8sCluster, namespace, pvcName) + + // create M0. + writtenDataHash0 := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName0) + WaitMantleBackupSynced(namespace, backupName0) + EnsureTemporaryResourcesDeleted(ctx) + + // create M1. + writtenDataHash1 := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName1) + WaitMantleBackupSynced(namespace, backupName1) + EnsureTemporaryResourcesDeleted(ctx) + + // Make sure M1 and M1' have the same contents. + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) + + // Make sure M0 and M0' have the same contents. + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) + }) + + It("should back up correctly if previous incremental MB is removed in the secondary cluster", func(ctx SpecContext) { + namespace := util.GetUniqueName("ns-") + pvcName := util.GetUniqueName("pvc-") + backupName0 := util.GetUniqueName("mb-") + backupName1 := util.GetUniqueName("mb-") + backupName2 := util.GetUniqueName("mb-") + restoreName0 := util.GetUniqueName("mr-") + restoreName1 := util.GetUniqueName("mr-") + restoreName2 := util.GetUniqueName("mr-") + + SetupEnvironment(namespace) + CreatePVC(ctx, PrimaryK8sCluster, namespace, pvcName) + + // create M0. + writtenDataHash0 := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName0) + WaitMantleBackupSynced(namespace, backupName0) + + // create M1. + writtenDataHash1 := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName1) + WaitMantleBackupSynced(namespace, backupName1) + + // remove M1'. + _, _, err := Kubectl(SecondaryK8sCluster, nil, "delete", "mb", "-n", namespace, backupName1) + Expect(err).NotTo(HaveOccurred()) + + // create M2. + writtenDataHash2 := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName2) + WaitMantleBackupSynced(namespace, backupName2) + EnsureTemporaryResourcesDeleted(ctx) + + // Make sure M2 and M2' have the same contents. + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName2, restoreName2, writtenDataHash2) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName2, restoreName2, writtenDataHash2) + + // Make sure M1 has the same contents. + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) + + // Make sure M0 and M0' have the same contents. + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) + + // Make sure M1' isn't re-created. + mbList, err := GetObjectList[mantlev1.MantleBackupList](SecondaryK8sCluster, "mb", namespace) + Expect(err).NotTo(HaveOccurred()) + Expect(slices.ContainsFunc(mbList.Items, func(mb mantlev1.MantleBackup) bool { + return mb.GetName() == backupName1 + })).To(BeFalse()) + }) + + It("should back up correctly if previous incremental MB is removed in the primary cluster", func(ctx SpecContext) { + namespace := util.GetUniqueName("ns-") + pvcName := util.GetUniqueName("pvc-") + backupName0 := util.GetUniqueName("mb-") + backupName1 := util.GetUniqueName("mb-") + backupName2 := util.GetUniqueName("mb-") + restoreName0 := util.GetUniqueName("mr-") + restoreName1 := util.GetUniqueName("mr-") + restoreName2 := util.GetUniqueName("mr-") + + SetupEnvironment(namespace) + CreatePVC(ctx, PrimaryK8sCluster, namespace, pvcName) + + // create M0. + writtenDataHash0 := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName0) + WaitMantleBackupSynced(namespace, backupName0) + + // create M1. + writtenDataHash1 := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName1) + WaitMantleBackupSynced(namespace, backupName1) + + // remove M1. + _, _, err := Kubectl(PrimaryK8sCluster, nil, "delete", "mb", "-n", namespace, backupName1) + Expect(err).NotTo(HaveOccurred()) + + // create M2. + writtenDataHash2 := WriteRandomDataToPV(ctx, PrimaryK8sCluster, namespace, pvcName) + CreateMantleBackup(PrimaryK8sCluster, namespace, pvcName, backupName2) + WaitMantleBackupSynced(namespace, backupName2) + EnsureTemporaryResourcesDeleted(ctx) + + // Make sure M2 and M2' have the same contents. + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName2, restoreName2, writtenDataHash2) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName2, restoreName2, writtenDataHash2) + + // Make sure M1' has the same contents. + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) + + // Make sure M0 and M0' have the same contents. + EnsureCorrectRestoration(PrimaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) + EnsureCorrectRestoration(SecondaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) + }) + }) +} diff --git a/test/e2e/multik8s/suite_test.go b/test/e2e/multik8s/suite_test.go deleted file mode 100644 index da6f350c..00000000 --- a/test/e2e/multik8s/suite_test.go +++ /dev/null @@ -1,554 +0,0 @@ -package multik8s - -import ( - "context" - _ "embed" - "encoding/json" - "errors" - "fmt" - "os" - "reflect" - "slices" - "strings" - "testing" - "time" - - "github.com/cybozu-go/mantle/internal/controller" - "github.com/cybozu-go/mantle/test/util" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "k8s.io/apimachinery/pkg/api/meta" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - - mantlev1 "github.com/cybozu-go/mantle/api/v1" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" -) - -func TestMtest(t *testing.T) { - if os.Getenv("E2ETEST") == "" { - t.Skip("Run under e2e/") - } - - RegisterFailHandler(Fail) - - SetDefaultEventuallyPollingInterval(time.Second) - SetDefaultEventuallyTimeout(3 * time.Minute) - EnforceDefaultTimeoutsWhenUsingContexts() - - RunSpecs(t, "rbd backup system test with multiple k8s clusters") -} - -var _ = Describe("Mantle", func() { - Context("wait controller to be ready", waitControllerToBeReady) - Context("replication test", replicationTestSuite) - Context("change to standalone", changeToStandalone) -}) - -func waitControllerToBeReady() { - It("wait for mantle-controller to be ready", func() { - Eventually(func() error { - return checkDeploymentReady(primaryK8sCluster, "rook-ceph", "mantle-controller") - }).Should(Succeed()) - - Eventually(func() error { - return checkDeploymentReady(primaryK8sCluster, "rook-ceph", "mantle-controller") - }).Should(Succeed()) - }) -} - -func setupEnvironment(namespace, pvcName string) { - GinkgoHelper() - By("setting up the environment") - Eventually(func() error { - return createNamespace(primaryK8sCluster, namespace) - }).Should(Succeed()) - Eventually(func() error { - return createNamespace(secondaryK8sCluster, namespace) - }).Should(Succeed()) - Eventually(func() error { - return applyRBDPoolAndSCTemplate(primaryK8sCluster, cephClusterNamespace) - }).Should(Succeed()) - Eventually(func() error { - return applyRBDPoolAndSCTemplate(secondaryK8sCluster, cephClusterNamespace) - }).Should(Succeed()) - Eventually(func() error { - return applyPVCTemplate(primaryK8sCluster, namespace, pvcName) - }).Should(Succeed()) -} - -func writeRandomDataToPV(ctx context.Context, namespace, pvcName string) string { - GinkgoHelper() - By("writing some random data to PV(C)") - writeJobName := util.GetUniqueName("job-") - Eventually(ctx, func() error { - return applyWriteJobTemplate(primaryK8sCluster, namespace, writeJobName, pvcName) - }).Should(Succeed()) - Eventually(ctx, func(g Gomega) { - job, err := getJob(primaryK8sCluster, namespace, writeJobName) - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(IsJobConditionTrue(job.Status.Conditions, batchv1.JobComplete)).To(BeTrue()) - }).Should(Succeed()) - stdout, _, err := kubectl(primaryK8sCluster, nil, "logs", "-n", namespace, "job/"+writeJobName) - Expect(err).NotTo(HaveOccurred()) - Expect(len(stdout)).NotTo(Equal(0)) - return string(stdout) -} - -func createMantleBackup(namespace, pvcName, backupName string) { - GinkgoHelper() - By("creating a MantleBackup object") - Eventually(func() error { - return applyMantleBackupTemplate(primaryK8sCluster, namespace, pvcName, backupName) - }).Should(Succeed()) -} - -func waitMantleBackupSynced(namespace, backupName string) { - GinkgoHelper() - By("checking MantleBackup's SyncedToRemote status") - Eventually(func() error { - mb, err := getMB(primaryK8sCluster, namespace, backupName) - if err != nil { - return err - } - if !meta.IsStatusConditionTrue(mb.Status.Conditions, mantlev1.BackupConditionSyncedToRemote) { - return errors.New("status of SyncedToRemote condition is not True") - } - return nil - }, "10m", "1s").Should(Succeed()) -} - -func ensureTemporaryResourcesDeleted(ctx context.Context) { - GinkgoHelper() - By("checking all temporary Jobs related to export and import of RBD images are removed") - primaryJobList, err := getObjectList[batchv1.JobList](primaryK8sCluster, "job", cephClusterNamespace) - Expect(err).NotTo(HaveOccurred()) - Expect(slices.ContainsFunc(primaryJobList.Items, func(job batchv1.Job) bool { - n := job.GetName() - return strings.HasPrefix(n, controller.MantleExportJobPrefix) || - strings.HasPrefix(n, controller.MantleUploadJobPrefix) - })).To(BeFalse()) - secondaryJobList, err := getObjectList[batchv1.JobList](secondaryK8sCluster, "job", cephClusterNamespace) - Expect(err).NotTo(HaveOccurred()) - Expect(slices.ContainsFunc(secondaryJobList.Items, func(job batchv1.Job) bool { - n := job.GetName() - return strings.HasPrefix(n, controller.MantleImportJobPrefix) || - strings.HasPrefix(n, controller.MantleDiscardJobPrefix) - })).To(BeFalse()) - - By("checking all temporary PVCs related to export and import of RBD images are removed") - primaryPVCList, err := getObjectList[corev1.PersistentVolumeClaimList]( - primaryK8sCluster, "pvc", cephClusterNamespace) - Expect(err).NotTo(HaveOccurred()) - Expect(slices.ContainsFunc(primaryPVCList.Items, func(pvc corev1.PersistentVolumeClaim) bool { - n := pvc.GetName() - return strings.HasPrefix(n, controller.MantleExportDataPVCPrefix) - })).To(BeFalse()) - secondaryPVCList, err := getObjectList[corev1.PersistentVolumeClaimList]( - secondaryK8sCluster, "pvc", cephClusterNamespace) - Expect(err).NotTo(HaveOccurred()) - Expect(slices.ContainsFunc(secondaryPVCList.Items, func(pvc corev1.PersistentVolumeClaim) bool { - n := pvc.GetName() - return strings.HasPrefix(n, controller.MantleDiscardPVCPrefix) - })).To(BeFalse()) - - By("checking all temporary PVs related to export and import of RBD images are removed") - secondaryPVList, err := getObjectList[corev1.PersistentVolumeList](secondaryK8sCluster, "pv", cephClusterNamespace) - Expect(err).NotTo(HaveOccurred()) - Expect(slices.ContainsFunc(secondaryPVList.Items, func(pv corev1.PersistentVolume) bool { - n := pv.GetName() - return strings.HasPrefix(n, controller.MantleDiscardPVPrefix) - })).To(BeFalse()) - - By("checking all temporary objects in the object storage related to export and import of RBD images are removed") - objectStorageClient, err := createObjectStorageClient(ctx) - Expect(err).NotTo(HaveOccurred()) - listOutput, err := objectStorageClient.listObjects(ctx) - Expect(err).NotTo(HaveOccurred()) - Expect(len(listOutput.Contents)).To(Equal(0)) -} - -func ensureCorrectRestoration( - clusterNo int, - ctx context.Context, - namespace, backupName, restoreName, writtenDataHash string, -) { - GinkgoHelper() - mountDeployName := util.GetUniqueName("deploy-") - clusterName := "primary" - if clusterNo == secondaryK8sCluster { - clusterName = "secondary" - } - By(fmt.Sprintf("%s: %s: creating MantleRestore by using the MantleBackup replicated above", - clusterName, backupName)) - Eventually(ctx, func() error { - return applyMantleRestoreTemplate(clusterNo, namespace, restoreName, backupName) - }).Should(Succeed()) - By(fmt.Sprintf("%s: %s: checking the MantleRestore can be ready to use", clusterName, backupName)) - Eventually(ctx, func(g Gomega) { - mr, err := getMR(clusterNo, namespace, restoreName) - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(meta.IsStatusConditionTrue(mr.Status.Conditions, "ReadyToUse")).To(BeTrue()) - }).Should(Succeed()) - By(fmt.Sprintf("%s: %s: checking the MantleRestore has the correct contents", clusterName, backupName)) - Eventually(ctx, func(g Gomega) { - err := applyMountDeployTemplate(clusterNo, namespace, mountDeployName, restoreName) - g.Expect(err).NotTo(HaveOccurred()) - stdout, _, err := kubectl(clusterNo, nil, "exec", "-n", namespace, "deploy/"+mountDeployName, "--", - "bash", "-c", "sha256sum /volume/data | awk '{print $1}'") - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(string(stdout)).To(Equal(writtenDataHash)) - }).Should(Succeed()) -} - -func replicationTestSuite() { - Describe("replication test", func() { - It("should correctly replicate PVC and MantleBackup resources", func(ctx SpecContext) { - namespace := util.GetUniqueName("ns-") - pvcName := util.GetUniqueName("pvc-") - backupName := util.GetUniqueName("mb-") - restoreName := util.GetUniqueName("mr-") - - setupEnvironment(namespace, pvcName) - writtenDataHash := writeRandomDataToPV(ctx, namespace, pvcName) - createMantleBackup(namespace, pvcName, backupName) - waitMantleBackupSynced(namespace, backupName) - - By("checking PVC is replicated") - Eventually(func() error { - primaryPVC, err := getPVC(primaryK8sCluster, namespace, pvcName) - if err != nil { - return err - } - - pvc, err := getPVC(secondaryK8sCluster, namespace, pvcName) - if err != nil { - return err - } - if pvc.Annotations == nil || - pvc.Annotations["mantle.cybozu.io/remote-uid"] != string(primaryPVC.GetUID()) { - return errors.New("invalid remote-uid annotation") - } - primaryPVC.Spec.VolumeName = "" - pvc.Spec.VolumeName = "" - if !reflect.DeepEqual(primaryPVC.Spec, pvc.Spec) { - return errors.New("spec not equal") - } - if pvc.Status.Phase != corev1.ClaimBound { - return errors.New("pvc not bound") - } - - return nil - }).Should(Succeed()) - - By("checking MantleBackup is replicated") - Eventually(func() error { - primaryPVC, err := getPVC(primaryK8sCluster, namespace, pvcName) - if err != nil { - return err - } - secondaryPVC, err := getPVC(secondaryK8sCluster, namespace, pvcName) - if err != nil { - return err - } - primaryMB, err := getMB(primaryK8sCluster, namespace, backupName) - if err != nil { - return err - } - - secondaryMB, err := getMB(secondaryK8sCluster, namespace, backupName) - if err != nil { - return err - } - if !controllerutil.ContainsFinalizer(secondaryMB, "mantlebackup.mantle.cybozu.io/finalizer") { - return errors.New("finalizer not found") - } - if secondaryMB.Labels == nil || - secondaryMB.Labels["mantle.cybozu.io/local-backup-target-pvc-uid"] != string(secondaryPVC.GetUID()) || - secondaryMB.Labels["mantle.cybozu.io/remote-backup-target-pvc-uid"] != string(primaryPVC.GetUID()) { - return errors.New("local/remote-backup-target-pvc-uid label not matched") - } - if secondaryMB.Annotations == nil || - secondaryMB.Annotations["mantle.cybozu.io/remote-uid"] != string(primaryMB.GetUID()) { - return errors.New("remote-uid not matched") - } - if !reflect.DeepEqual(primaryMB.Spec, secondaryMB.Spec) { - return errors.New("spec not equal") - } - if secondaryMB.Status.CreatedAt.IsZero() { - return errors.New(".Status.CreatedAt is zero") - } - if !meta.IsStatusConditionTrue(secondaryMB.Status.Conditions, "ReadyToUse") { - return errors.New("ReadyToUse of .Status.Conditions is not True") - } - - return nil - }).Should(Succeed()) - - ensureTemporaryResourcesDeleted(ctx) - ensureCorrectRestoration(primaryK8sCluster, ctx, namespace, backupName, restoreName, writtenDataHash) - ensureCorrectRestoration(secondaryK8sCluster, ctx, namespace, backupName, restoreName, writtenDataHash) - }) - - It("should back up correctly if previous MB is deleted in the secondary cluster", func(ctx SpecContext) { - namespace := util.GetUniqueName("ns-") - pvcName := util.GetUniqueName("pvc-") - backupName0 := util.GetUniqueName("mb-") - backupName1 := util.GetUniqueName("mb-") - restoreName0 := util.GetUniqueName("mr-") - restoreName1 := util.GetUniqueName("mr-") - - setupEnvironment(namespace, pvcName) - writtenDataHash0 := writeRandomDataToPV(ctx, namespace, pvcName) - - // create M0. - createMantleBackup(namespace, pvcName, backupName0) - waitMantleBackupSynced(namespace, backupName0) - - // remove M0'. - _, _, err := kubectl(secondaryK8sCluster, nil, "delete", "mb", "-n", namespace, backupName0) - Expect(err).NotTo(HaveOccurred()) - - // create M1. - writtenDataHash1 := writeRandomDataToPV(ctx, namespace, pvcName) - createMantleBackup(namespace, pvcName, backupName1) - waitMantleBackupSynced(namespace, backupName1) - ensureTemporaryResourcesDeleted(ctx) - - // Make sure M1 and M1' have the same contents. - ensureCorrectRestoration(primaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) - ensureCorrectRestoration(secondaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) - - // Make sure M0 can be used for restoration. - ensureCorrectRestoration(primaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) - }) - - It("should back up correctly if previous MB is deleted in the primary cluster", func(ctx SpecContext) { - namespace := util.GetUniqueName("ns-") - pvcName := util.GetUniqueName("pvc-") - backupName0 := util.GetUniqueName("mb-") - backupName1 := util.GetUniqueName("mb-") - restoreName0 := util.GetUniqueName("mr-") - restoreName1 := util.GetUniqueName("mr-") - - setupEnvironment(namespace, pvcName) - writtenDataHash0 := writeRandomDataToPV(ctx, namespace, pvcName) - - // create M0. - createMantleBackup(namespace, pvcName, backupName0) - waitMantleBackupSynced(namespace, backupName0) - - // remove M0. - _, _, err := kubectl(primaryK8sCluster, nil, "delete", "mb", "-n", namespace, backupName0) - Expect(err).NotTo(HaveOccurred()) - - // create M1. - writtenDataHash1 := writeRandomDataToPV(ctx, namespace, pvcName) - createMantleBackup(namespace, pvcName, backupName1) - waitMantleBackupSynced(namespace, backupName1) - ensureTemporaryResourcesDeleted(ctx) - - // Make sure M1 and M1' have the same contents. - ensureCorrectRestoration(primaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) - ensureCorrectRestoration(secondaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) - - // Make sure M0' can be used for restoration. - ensureCorrectRestoration(secondaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) - }) - - It("should perform a correct incremental backup", func(ctx SpecContext) { - namespace := util.GetUniqueName("ns-") - pvcName := util.GetUniqueName("pvc-") - backupName0 := util.GetUniqueName("mb-") - backupName1 := util.GetUniqueName("mb-") - restoreName0 := util.GetUniqueName("mr-") - restoreName1 := util.GetUniqueName("mr-") - - setupEnvironment(namespace, pvcName) - - // create M0. - writtenDataHash0 := writeRandomDataToPV(ctx, namespace, pvcName) - createMantleBackup(namespace, pvcName, backupName0) - waitMantleBackupSynced(namespace, backupName0) - ensureTemporaryResourcesDeleted(ctx) - - // create M1. - writtenDataHash1 := writeRandomDataToPV(ctx, namespace, pvcName) - createMantleBackup(namespace, pvcName, backupName1) - waitMantleBackupSynced(namespace, backupName1) - ensureTemporaryResourcesDeleted(ctx) - - // Make sure M1 and M1' have the same contents. - ensureCorrectRestoration(primaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) - ensureCorrectRestoration(secondaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) - - // Make sure M0 and M0' have the same contents. - ensureCorrectRestoration(primaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) - ensureCorrectRestoration(secondaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) - }) - - It("should back up correctly if previous incremental MB is removed in the secondary cluster", func(ctx SpecContext) { - namespace := util.GetUniqueName("ns-") - pvcName := util.GetUniqueName("pvc-") - backupName0 := util.GetUniqueName("mb-") - backupName1 := util.GetUniqueName("mb-") - backupName2 := util.GetUniqueName("mb-") - restoreName0 := util.GetUniqueName("mr-") - restoreName1 := util.GetUniqueName("mr-") - restoreName2 := util.GetUniqueName("mr-") - - setupEnvironment(namespace, pvcName) - - // create M0. - writtenDataHash0 := writeRandomDataToPV(ctx, namespace, pvcName) - createMantleBackup(namespace, pvcName, backupName0) - waitMantleBackupSynced(namespace, backupName0) - - // create M1. - writtenDataHash1 := writeRandomDataToPV(ctx, namespace, pvcName) - createMantleBackup(namespace, pvcName, backupName1) - waitMantleBackupSynced(namespace, backupName1) - - // remove M1'. - _, _, err := kubectl(secondaryK8sCluster, nil, "delete", "mb", "-n", namespace, backupName1) - Expect(err).NotTo(HaveOccurred()) - - // create M2. - writtenDataHash2 := writeRandomDataToPV(ctx, namespace, pvcName) - createMantleBackup(namespace, pvcName, backupName2) - waitMantleBackupSynced(namespace, backupName2) - ensureTemporaryResourcesDeleted(ctx) - - // Make sure M2 and M2' have the same contents. - ensureCorrectRestoration(primaryK8sCluster, ctx, namespace, backupName2, restoreName2, writtenDataHash2) - ensureCorrectRestoration(secondaryK8sCluster, ctx, namespace, backupName2, restoreName2, writtenDataHash2) - - // Make sure M1 has the same contents. - ensureCorrectRestoration(primaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) - - // Make sure M0 and M0' have the same contents. - ensureCorrectRestoration(primaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) - ensureCorrectRestoration(secondaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) - - // Make sure M1' isn't re-created. - mbList, err := getObjectList[mantlev1.MantleBackupList](secondaryK8sCluster, "mb", namespace) - Expect(err).NotTo(HaveOccurred()) - Expect(slices.ContainsFunc(mbList.Items, func(mb mantlev1.MantleBackup) bool { - return mb.GetName() == backupName1 - })).To(BeFalse()) - }) - - It("should back up correctly if previous incremental MB is removed in the primary cluster", func(ctx SpecContext) { - namespace := util.GetUniqueName("ns-") - pvcName := util.GetUniqueName("pvc-") - backupName0 := util.GetUniqueName("mb-") - backupName1 := util.GetUniqueName("mb-") - backupName2 := util.GetUniqueName("mb-") - restoreName0 := util.GetUniqueName("mr-") - restoreName1 := util.GetUniqueName("mr-") - restoreName2 := util.GetUniqueName("mr-") - - setupEnvironment(namespace, pvcName) - - // create M0. - writtenDataHash0 := writeRandomDataToPV(ctx, namespace, pvcName) - createMantleBackup(namespace, pvcName, backupName0) - waitMantleBackupSynced(namespace, backupName0) - - // create M1. - writtenDataHash1 := writeRandomDataToPV(ctx, namespace, pvcName) - createMantleBackup(namespace, pvcName, backupName1) - waitMantleBackupSynced(namespace, backupName1) - - // remove M1. - _, _, err := kubectl(primaryK8sCluster, nil, "delete", "mb", "-n", namespace, backupName1) - Expect(err).NotTo(HaveOccurred()) - - // create M2. - writtenDataHash2 := writeRandomDataToPV(ctx, namespace, pvcName) - createMantleBackup(namespace, pvcName, backupName2) - waitMantleBackupSynced(namespace, backupName2) - ensureTemporaryResourcesDeleted(ctx) - - // Make sure M2 and M2' have the same contents. - ensureCorrectRestoration(primaryK8sCluster, ctx, namespace, backupName2, restoreName2, writtenDataHash2) - ensureCorrectRestoration(secondaryK8sCluster, ctx, namespace, backupName2, restoreName2, writtenDataHash2) - - // Make sure M1' has the same contents. - ensureCorrectRestoration(secondaryK8sCluster, ctx, namespace, backupName1, restoreName1, writtenDataHash1) - - // Make sure M0 and M0' have the same contents. - ensureCorrectRestoration(primaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) - ensureCorrectRestoration(secondaryK8sCluster, ctx, namespace, backupName0, restoreName0, writtenDataHash0) - }) - }) -} - -func changeToStandalone() { - Describe("change to standalone", func() { - var namespace, pvcName, backupName string - - It("should replicate a MantleBackup resource", func() { - namespace = util.GetUniqueName("ns-") - pvcName = util.GetUniqueName("pvc-") - backupName = util.GetUniqueName("mb-") - - setupEnvironment(namespace, pvcName) - createMantleBackup(namespace, pvcName, backupName) - waitMantleBackupSynced(namespace, backupName) - }) - - It("should change the roles to standalone", func() { - By("changing the primary mantle to standalone") - err := changeClusterRole(primaryK8sCluster, controller.RoleStandalone) - Expect(err).NotTo(HaveOccurred()) - By("changing the secondary mantle to standalone") - err = changeClusterRole(secondaryK8sCluster, controller.RoleStandalone) - Expect(err).NotTo(HaveOccurred()) - }) - - It("should delete MantleBackup created by primary mantle from standalone mantle", func(ctx SpecContext) { - By("deleting the MantleBackup in the primary cluster") - _, _, err := kubectl(primaryK8sCluster, nil, "delete", "mb", "-n", namespace, backupName, "--wait=false") - Expect(err).NotTo(HaveOccurred()) - - By("checking that the MantleBackup is actually deleted") - Eventually(ctx, func(g Gomega) { - stdout, _, err := kubectl(primaryK8sCluster, nil, "get", "mb", "-n", namespace, "-o", "json") - g.Expect(err).NotTo(HaveOccurred()) - var mbs mantlev1.MantleBackupList - err = json.Unmarshal(stdout, &mbs) - g.Expect(err).NotTo(HaveOccurred()) - found := false - for _, mb := range mbs.Items { - if mb.GetName() == backupName { - found = true - } - } - g.Expect(found).To(BeFalse()) - }).Should(Succeed()) - }) - - It("should NOT delete MantleBackup created by secondary mantle from standalone mantle", func(ctx SpecContext) { - By("deleting the MantleBackup in the secondary cluster") - _, _, err := kubectl(secondaryK8sCluster, nil, "delete", "mb", "-n", namespace, backupName, "--wait=false") - Expect(err).NotTo(HaveOccurred()) - - By("checking that the MantleBackup is NOT deleted") - Consistently(ctx, func(g Gomega) { - stdout, _, err := kubectl(secondaryK8sCluster, nil, "get", "mb", "-n", namespace, "-o", "json") - g.Expect(err).NotTo(HaveOccurred()) - var mbs mantlev1.MantleBackupList - err = json.Unmarshal(stdout, &mbs) - g.Expect(err).NotTo(HaveOccurred()) - found := false - for _, mb := range mbs.Items { - if mb.GetName() == backupName { - found = true - } - } - g.Expect(found).To(BeTrue()) - }, "10s", "1s").Should(Succeed()) - }) - }) -} diff --git a/test/e2e/multik8s/testdata/mantlebackup-template.yaml b/test/e2e/multik8s/testutil/testdata/mantlebackup-template.yaml similarity index 100% rename from test/e2e/multik8s/testdata/mantlebackup-template.yaml rename to test/e2e/multik8s/testutil/testdata/mantlebackup-template.yaml diff --git a/test/e2e/multik8s/testdata/mantlerestore-template.yaml b/test/e2e/multik8s/testutil/testdata/mantlerestore-template.yaml similarity index 100% rename from test/e2e/multik8s/testdata/mantlerestore-template.yaml rename to test/e2e/multik8s/testutil/testdata/mantlerestore-template.yaml diff --git a/test/e2e/multik8s/testdata/mount-deploy-template.yaml b/test/e2e/multik8s/testutil/testdata/mount-deploy-template.yaml similarity index 100% rename from test/e2e/multik8s/testdata/mount-deploy-template.yaml rename to test/e2e/multik8s/testutil/testdata/mount-deploy-template.yaml diff --git a/test/e2e/multik8s/testdata/pvc-template.yaml b/test/e2e/multik8s/testutil/testdata/pvc-template.yaml similarity index 100% rename from test/e2e/multik8s/testdata/pvc-template.yaml rename to test/e2e/multik8s/testutil/testdata/pvc-template.yaml diff --git a/test/e2e/multik8s/testdata/rbd-pool-sc-template.yaml b/test/e2e/multik8s/testutil/testdata/rbd-pool-sc-template.yaml similarity index 100% rename from test/e2e/multik8s/testdata/rbd-pool-sc-template.yaml rename to test/e2e/multik8s/testutil/testdata/rbd-pool-sc-template.yaml diff --git a/test/e2e/multik8s/testdata/write-job-template.yaml b/test/e2e/multik8s/testutil/testdata/write-job-template.yaml similarity index 100% rename from test/e2e/multik8s/testdata/write-job-template.yaml rename to test/e2e/multik8s/testutil/testdata/write-job-template.yaml diff --git a/test/e2e/multik8s/testutil/util.go b/test/e2e/multik8s/testutil/util.go new file mode 100644 index 00000000..c1c0b2e9 --- /dev/null +++ b/test/e2e/multik8s/testutil/util.go @@ -0,0 +1,562 @@ +package testutil + +import ( + "bytes" + "context" + _ "embed" + "encoding/json" + "errors" + "fmt" + "os" + "os/exec" + "slices" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + mantlev1 "github.com/cybozu-go/mantle/api/v1" + "github.com/cybozu-go/mantle/internal/controller" + "github.com/cybozu-go/mantle/test/util" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" +) + +const ( + CephClusterNamespace = "rook-ceph" + PrimaryK8sCluster = 1 + SecondaryK8sCluster = 2 +) + +var ( + //go:embed testdata/pvc-template.yaml + testPVCTemplate string + //go:embed testdata/rbd-pool-sc-template.yaml + testRBDPoolSCTemplate string + //go:embed testdata/mantlebackup-template.yaml + testMantleBackupTemplate string + //go:embed testdata/mantlerestore-template.yaml + testMantleRestoreTemplate string + //go:embed testdata/mount-deploy-template.yaml + mountDeployTemplate string + //go:embed testdata/write-job-template.yaml + writeJobTemplate string + + kubectlPrefixPrimary = os.Getenv("KUBECTL_PRIMARY") + kubectlPrefixSecondary = os.Getenv("KUBECTL_SECONDARY") +) + +func execAtLocal(ctx context.Context, cmd string, input []byte, args ...string) ([]byte, []byte, error) { + var stdout, stderr bytes.Buffer + command := exec.CommandContext(ctx, cmd, args...) + command.Stdout = &stdout + command.Stderr = &stderr + + if len(input) != 0 { + command.Stdin = bytes.NewReader(input) + } + + err := command.Run() + return stdout.Bytes(), stderr.Bytes(), err +} + +// input can be nil +func Kubectl(clusterNo int, input []byte, args ...string) ([]byte, []byte, error) { + kubectlPrefix := "" + switch clusterNo { + case PrimaryK8sCluster: + kubectlPrefix = kubectlPrefixPrimary + case SecondaryK8sCluster: + kubectlPrefix = kubectlPrefixSecondary + default: + panic(fmt.Sprintf("invalid clusterNo: %d", clusterNo)) + } + if len(kubectlPrefix) == 0 { + panic("Either KUBECTL_PRIMARY or KUBECTL_SECONDARY environment variable is not set") + } + fields := strings.Fields(kubectlPrefix) + fields = append(fields, args...) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + return execAtLocal(ctx, fields[0], input, fields[1:]...) +} + +func CheckDeploymentReady(clusterNo int, namespace, name string) error { + _, stderr, err := Kubectl( + clusterNo, nil, + "-n", namespace, "wait", "--for=condition=Available", "deploy", name, "--timeout=1m", + ) + if err != nil { + return fmt.Errorf("kubectl wait deploy failed. stderr: %s, err: %w", string(stderr), err) + } + return nil +} + +func ApplyMantleBackupTemplate(clusterNo int, namespace, pvcName, backupName string) error { + manifest := fmt.Sprintf(testMantleBackupTemplate, backupName, backupName, namespace, pvcName) + _, _, err := Kubectl(clusterNo, []byte(manifest), "apply", "-f", "-") + if err != nil { + return fmt.Errorf("kubectl apply mantlebackup failed. err: %w", err) + } + return nil +} + +func ApplyMantleRestoreTemplate(clusterNo int, namespace, restoreName, backupName string) error { + manifest := fmt.Sprintf(testMantleRestoreTemplate, restoreName, restoreName, namespace, backupName) + _, _, err := Kubectl(clusterNo, []byte(manifest), "apply", "-f", "-") + if err != nil { + return fmt.Errorf("kubectl apply mantlerestore failed. err: %w", err) + } + return nil +} + +func ApplyPVCTemplate(clusterNo int, namespace, name string) error { + manifest := fmt.Sprintf(testPVCTemplate, name) + _, _, err := Kubectl(clusterNo, []byte(manifest), "apply", "-n", namespace, "-f", "-") + if err != nil { + return fmt.Errorf("kubectl apply pvc failed. err: %w", err) + } + return nil +} + +func ApplyMountDeployTemplate(clusterNo int, namespace, name, pvcName string) error { + manifest := fmt.Sprintf(mountDeployTemplate, name, namespace, name, name, pvcName) + _, _, err := Kubectl(clusterNo, []byte(manifest), "apply", "-n", namespace, "-f", "-") + if err != nil { + return fmt.Errorf("kubectl apply mount deploy failed. err: %w", err) + } + return nil +} + +func ApplyWriteJobTemplate(clusterNo int, namespace, name, pvcName string) error { + manifest := fmt.Sprintf(writeJobTemplate, name, namespace, pvcName) + _, _, err := Kubectl(clusterNo, []byte(manifest), "apply", "-n", namespace, "-f", "-") + if err != nil { + return fmt.Errorf("kubectl apply write job failed. err: %w", err) + } + return nil +} + +func CreateNamespace(clusterNo int, name string) error { + _, _, err := Kubectl(clusterNo, nil, "create", "ns", name) + if err != nil { + return fmt.Errorf("kubectl create ns failed. err: %w", err) + } + return nil +} + +func ApplyRBDPoolAndSCTemplate(clusterNo int, namespace string) error { //nolint:unparam + manifest := fmt.Sprintf( + testRBDPoolSCTemplate, namespace, + namespace, namespace, namespace, namespace) + _, _, err := Kubectl(clusterNo, []byte(manifest), "apply", "-n", namespace, "-f", "-") + if err != nil { + return err + } + return nil +} + +func GetObject[T any](clusterNo int, kind, namespace, name string) (*T, error) { + stdout, _, err := Kubectl(clusterNo, nil, "get", kind, "-n", namespace, name, "-o", "json") + if err != nil { + return nil, err + } + + var obj T + if err := json.Unmarshal(stdout, &obj); err != nil { + return nil, err + } + + return &obj, nil +} + +func GetMB(clusterNo int, namespace, name string) (*mantlev1.MantleBackup, error) { + return GetObject[mantlev1.MantleBackup](clusterNo, "mantlebackup", namespace, name) +} + +func GetPVC(clusterNo int, namespace, name string) (*corev1.PersistentVolumeClaim, error) { + return GetObject[corev1.PersistentVolumeClaim](clusterNo, "pvc", namespace, name) +} + +func GetMR(clusterNo int, namespace, name string) (*mantlev1.MantleRestore, error) { + return GetObject[mantlev1.MantleRestore](clusterNo, "mantlerestore", namespace, name) +} + +func GetDeploy(clusterNo int, namespace, name string) (*appsv1.Deployment, error) { + return GetObject[appsv1.Deployment](clusterNo, "deploy", namespace, name) +} + +func GetJob(clusterNo int, namespace, name string) (*batchv1.Job, error) { + return GetObject[batchv1.Job](clusterNo, "job", namespace, name) +} + +func GetObjectList[T any](clusterNo int, kind, namespace string) (*T, error) { + var stdout []byte + var err error + if namespace == "" { + stdout, _, err = Kubectl(clusterNo, nil, "get", kind, "-o", "json") + } else { + stdout, _, err = Kubectl(clusterNo, nil, "get", kind, "-n", namespace, "-o", "json") + } + if err != nil { + return nil, err + } + + var objList T + if err := json.Unmarshal(stdout, &objList); err != nil { + return nil, err + } + + return &objList, nil +} + +func GetMBList(clusterNo int, namespace string) (*mantlev1.MantleBackupList, error) { + return GetObjectList[mantlev1.MantleBackupList](clusterNo, "mantlebackup", namespace) +} + +func ChangeClusterRole(clusterNo int, newRole string) error { + deployName := "mantle-controller" + deploy, err := GetDeploy(clusterNo, CephClusterNamespace, deployName) + if err != nil { + return fmt.Errorf("failed to get mantle-controller deploy: %w", err) + } + + roleIndex := slices.IndexFunc( + deploy.Spec.Template.Spec.Containers[0].Args, + func(arg string) bool { return strings.HasPrefix(arg, "--role=") }, + ) + if roleIndex == -1 { + return errors.New("failed to find --role= argument") + } + + _, _, err = Kubectl( + clusterNo, nil, "patch", "deploy", "-n", CephClusterNamespace, deployName, "--type=json", + fmt.Sprintf( + `-p=[{"op": "replace", "path": "/spec/template/spec/containers/0/args/%d", "value":"--role=%s"}]`, + roleIndex, + newRole, + ), + ) + if err != nil { + return fmt.Errorf("failed to patch mantle-controller deploy: %w", err) + } + + // Wait for the new controller to start + numRetries := 10 + for i := 0; i < numRetries; i++ { + stdout, _, err := Kubectl(clusterNo, nil, "get", "pod", "-n", CephClusterNamespace, "-o", "json") + if err != nil { + return fmt.Errorf("failed to get pod: %w", err) + } + var pods corev1.PodList + err = json.Unmarshal(stdout, &pods) + if err != nil { + return fmt.Errorf("failed to unmarshal pod list: %w", err) + } + ready := true + for _, pod := range pods.Items { + if strings.HasPrefix(pod.GetName(), deployName) { + for _, container := range pod.Spec.Containers { + if !slices.Contains(container.Args, fmt.Sprintf("--role=%s", newRole)) { + ready = false + } + } + } + } + if ready { + break + } + time.Sleep(10 * time.Second) + } + + return nil +} + +type ObjectStorageClient struct { + cli *s3.Client + bucketName string +} + +func (c *ObjectStorageClient) listObjects(ctx context.Context) (*s3.ListObjectsV2Output, error) { + return c.cli.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: &c.bucketName, + }) +} + +func CreateObjectStorageClient(ctx context.Context) (*ObjectStorageClient, error) { + // Find the endpoint of the object storage from the command-line arguments for mantle-controller. + stdout, _, err := Kubectl(PrimaryK8sCluster, nil, + "get", "deploy", "-n", CephClusterNamespace, "mantle-controller", "-o", "json") + if err != nil { + return nil, fmt.Errorf("failed to get deploy: %w", err) + } + var deploy appsv1.Deployment + if err := json.Unmarshal(stdout, &deploy); err != nil { + return nil, fmt.Errorf("failed to unmarshal deploy: %w", err) + } + args := deploy.Spec.Template.Spec.Containers[0].Args + endpointIndex := slices.IndexFunc(args, func(s string) bool { + return strings.HasPrefix(s, "--object-storage-endpoint=") + }) + if endpointIndex == -1 { + return nil, errors.New("failed to find object storage endpoint") + } + objectStorageEndpoint, _ := strings.CutPrefix(args[endpointIndex], "--object-storage-endpoint=") + + // Get the bucket name from the OBC. + stdout, _, err = Kubectl(SecondaryK8sCluster, nil, + "get", "obc", "-n", CephClusterNamespace, "export-data", "-o", "json") + if err != nil { + return nil, fmt.Errorf("failed to get obc: %w", err) + } + var obc struct { + Spec struct { + BucketName string `json:"bucketName"` + } `json:"spec"` + } + if err := json.Unmarshal(stdout, &obc); err != nil { + return nil, fmt.Errorf("failed to unmarshal obc: %w", err) + } + + // Get the credentials from the Secret. + stdout, _, err = Kubectl(SecondaryK8sCluster, nil, + "get", "secret", "-n", CephClusterNamespace, "export-data", "-o", "json") + if err != nil { + return nil, fmt.Errorf("failed to get export-data secret: %w", err) + } + var secret corev1.Secret + if err := json.Unmarshal(stdout, &secret); err != nil { + return nil, fmt.Errorf("failed to unmarshal secret: %w", err) + } + awsAccessKeyID := secret.Data["AWS_ACCESS_KEY_ID"] + awsSecretAccessKey := secret.Data["AWS_SECRET_ACCESS_KEY"] + + // Construct a S3 client. + sdkConfig, err := config.LoadDefaultConfig( + ctx, + config.WithRegion("ceph"), + config.WithCredentialsProvider( + aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) { + return aws.Credentials{ + AccessKeyID: string(awsAccessKeyID), + SecretAccessKey: string(awsSecretAccessKey), + }, nil + }), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to load default config: %w", err) + } + s3Client := s3.NewFromConfig(sdkConfig, func(o *s3.Options) { + o.BaseEndpoint = &objectStorageEndpoint + o.UsePathStyle = true + }) + + return &ObjectStorageClient{cli: s3Client, bucketName: obc.Spec.BucketName}, nil +} + +// IsJobConditionTrue returns true when the conditionType is present and set to +// `metav1.ConditionTrue`. Otherwise, it returns false. Note that we can't use +// meta.IsStatusConditionTrue because it doesn't accept []JobCondition. +func IsJobConditionTrue(conditions []batchv1.JobCondition, conditionType batchv1.JobConditionType) bool { + for _, cond := range conditions { + if cond.Type == conditionType && cond.Status == corev1.ConditionTrue { + return true + } + } + return false +} + +func WaitControllerToBeReady() { + GinkgoHelper() + It("wait for mantle-controller to be ready", func() { + Eventually(func() error { + return CheckDeploymentReady(PrimaryK8sCluster, "rook-ceph", "mantle-controller") + }).Should(Succeed()) + + Eventually(func() error { + return CheckDeploymentReady(PrimaryK8sCluster, "rook-ceph", "mantle-controller") + }).Should(Succeed()) + }) +} + +func SetupEnvironment(namespace string) { + GinkgoHelper() + By("setting up the environment") + Eventually(func() error { + return CreateNamespace(PrimaryK8sCluster, namespace) + }).Should(Succeed()) + Eventually(func() error { + return CreateNamespace(SecondaryK8sCluster, namespace) + }).Should(Succeed()) + Eventually(func() error { + return ApplyRBDPoolAndSCTemplate(PrimaryK8sCluster, CephClusterNamespace) + }).Should(Succeed()) + Eventually(func() error { + return ApplyRBDPoolAndSCTemplate(SecondaryK8sCluster, CephClusterNamespace) + }).Should(Succeed()) +} + +func CreatePVC(ctx context.Context, cluster int, namespace, name string) { + Eventually(ctx, func() error { + return ApplyPVCTemplate(cluster, namespace, name) + }).Should(Succeed()) +} + +func WriteRandomDataToPV(ctx context.Context, cluster int, namespace, pvcName string) string { + GinkgoHelper() + By("writing some random data to PV(C)") + writeJobName := util.GetUniqueName("job-") + Eventually(ctx, func() error { + return ApplyWriteJobTemplate(cluster, namespace, writeJobName, pvcName) + }).Should(Succeed()) + Eventually(ctx, func(g Gomega) { + job, err := GetJob(cluster, namespace, writeJobName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(IsJobConditionTrue(job.Status.Conditions, batchv1.JobComplete)).To(BeTrue()) + }).Should(Succeed()) + stdout, _, err := Kubectl(cluster, nil, "logs", "-n", namespace, "job/"+writeJobName) + Expect(err).NotTo(HaveOccurred()) + Expect(len(stdout)).NotTo(Equal(0)) + return string(stdout) +} + +func CreateMantleBackup(cluster int, namespace, pvcName, backupName string) { + GinkgoHelper() + By("creating a MantleBackup object") + Eventually(func() error { + return ApplyMantleBackupTemplate(cluster, namespace, pvcName, backupName) + }).Should(Succeed()) +} + +func WaitMantleBackupReadyToUse(cluster int, namespace, backupName string) { + GinkgoHelper() + By("checking MantleBackup's ReadyToUse status") + Eventually(func() error { + mb, err := GetMB(cluster, namespace, backupName) + if err != nil { + return err + } + if !meta.IsStatusConditionTrue(mb.Status.Conditions, mantlev1.BackupConditionReadyToUse) { + return errors.New("status of ReadyToUse condition is not True") + } + return nil + }, "10m", "1s").Should(Succeed()) +} + +func WaitMantleBackupSynced(namespace, backupName string) { + GinkgoHelper() + By("checking MantleBackup's SyncedToRemote status") + Eventually(func() error { + mb, err := GetMB(PrimaryK8sCluster, namespace, backupName) + if err != nil { + return err + } + if !meta.IsStatusConditionTrue(mb.Status.Conditions, mantlev1.BackupConditionSyncedToRemote) { + return errors.New("status of SyncedToRemote condition is not True") + } + return nil + }, "10m", "1s").Should(Succeed()) +} + +func EnsureMantleBackupNotExist(ctx context.Context, cluster int, namespace, backupName string) { + By("checking MantleBackup doesn't exist") + Consistently(ctx, func(g Gomega) { + mbs, err := GetMBList(cluster, namespace) + g.Expect(err).NotTo(HaveOccurred()) + exist := slices.ContainsFunc(mbs.Items, func(mb mantlev1.MantleBackup) bool { + return mb.GetName() == backupName + }) + g.Expect(exist).To(BeFalse()) + }, "10s").Should(Succeed()) +} + +func EnsureTemporaryResourcesDeleted(ctx context.Context) { + GinkgoHelper() + By("checking all temporary Jobs related to export and import of RBD images are removed") + primaryJobList, err := GetObjectList[batchv1.JobList](PrimaryK8sCluster, "job", CephClusterNamespace) + Expect(err).NotTo(HaveOccurred()) + Expect(slices.ContainsFunc(primaryJobList.Items, func(job batchv1.Job) bool { + n := job.GetName() + return strings.HasPrefix(n, controller.MantleExportJobPrefix) || + strings.HasPrefix(n, controller.MantleUploadJobPrefix) + })).To(BeFalse()) + secondaryJobList, err := GetObjectList[batchv1.JobList](SecondaryK8sCluster, "job", CephClusterNamespace) + Expect(err).NotTo(HaveOccurred()) + Expect(slices.ContainsFunc(secondaryJobList.Items, func(job batchv1.Job) bool { + n := job.GetName() + return strings.HasPrefix(n, controller.MantleImportJobPrefix) || + strings.HasPrefix(n, controller.MantleDiscardJobPrefix) + })).To(BeFalse()) + + By("checking all temporary PVCs related to export and import of RBD images are removed") + primaryPVCList, err := GetObjectList[corev1.PersistentVolumeClaimList]( + PrimaryK8sCluster, "pvc", CephClusterNamespace) + Expect(err).NotTo(HaveOccurred()) + Expect(slices.ContainsFunc(primaryPVCList.Items, func(pvc corev1.PersistentVolumeClaim) bool { + n := pvc.GetName() + return strings.HasPrefix(n, controller.MantleExportDataPVCPrefix) + })).To(BeFalse()) + secondaryPVCList, err := GetObjectList[corev1.PersistentVolumeClaimList]( + SecondaryK8sCluster, "pvc", CephClusterNamespace) + Expect(err).NotTo(HaveOccurred()) + Expect(slices.ContainsFunc(secondaryPVCList.Items, func(pvc corev1.PersistentVolumeClaim) bool { + n := pvc.GetName() + return strings.HasPrefix(n, controller.MantleDiscardPVCPrefix) + })).To(BeFalse()) + + By("checking all temporary PVs related to export and import of RBD images are removed") + secondaryPVList, err := GetObjectList[corev1.PersistentVolumeList](SecondaryK8sCluster, "pv", CephClusterNamespace) + Expect(err).NotTo(HaveOccurred()) + Expect(slices.ContainsFunc(secondaryPVList.Items, func(pv corev1.PersistentVolume) bool { + n := pv.GetName() + return strings.HasPrefix(n, controller.MantleDiscardPVPrefix) + })).To(BeFalse()) + + By("checking all temporary objects in the object storage related to export and import of RBD images are removed") + objectStorageClient, err := CreateObjectStorageClient(ctx) + Expect(err).NotTo(HaveOccurred()) + listOutput, err := objectStorageClient.listObjects(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(len(listOutput.Contents)).To(Equal(0)) +} + +func EnsureCorrectRestoration( + clusterNo int, + ctx context.Context, + namespace, backupName, restoreName, writtenDataHash string, +) { + GinkgoHelper() + mountDeployName := util.GetUniqueName("deploy-") + clusterName := "primary" + if clusterNo == SecondaryK8sCluster { + clusterName = "secondary" + } + By(fmt.Sprintf("%s: %s: creating MantleRestore by using the MantleBackup replicated above", + clusterName, backupName)) + Eventually(ctx, func() error { + return ApplyMantleRestoreTemplate(clusterNo, namespace, restoreName, backupName) + }).Should(Succeed()) + By(fmt.Sprintf("%s: %s: checking the MantleRestore can be ready to use", clusterName, backupName)) + Eventually(ctx, func(g Gomega) { + mr, err := GetMR(clusterNo, namespace, restoreName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(meta.IsStatusConditionTrue(mr.Status.Conditions, "ReadyToUse")).To(BeTrue()) + }).Should(Succeed()) + By(fmt.Sprintf("%s: %s: checking the MantleRestore has the correct contents", clusterName, backupName)) + Eventually(ctx, func(g Gomega) { + err := ApplyMountDeployTemplate(clusterNo, namespace, mountDeployName, restoreName) + g.Expect(err).NotTo(HaveOccurred()) + stdout, _, err := Kubectl(clusterNo, nil, "exec", "-n", namespace, "deploy/"+mountDeployName, "--", + "bash", "-c", "sha256sum /volume/data | awk '{print $1}'") + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(string(stdout)).To(Equal(writtenDataHash)) + }).Should(Succeed()) +} diff --git a/test/e2e/multik8s/util.go b/test/e2e/multik8s/util.go deleted file mode 100644 index 2a28c2d9..00000000 --- a/test/e2e/multik8s/util.go +++ /dev/null @@ -1,362 +0,0 @@ -package multik8s - -import ( - "bytes" - "context" - _ "embed" - "encoding/json" - "errors" - "fmt" - "os" - "os/exec" - "slices" - "strings" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/s3" - mantlev1 "github.com/cybozu-go/mantle/api/v1" - appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" -) - -const ( - cephClusterNamespace = "rook-ceph" - primaryK8sCluster = 1 - secondaryK8sCluster = 2 -) - -var ( - //go:embed testdata/pvc-template.yaml - testPVCTemplate string - //go:embed testdata/rbd-pool-sc-template.yaml - testRBDPoolSCTemplate string - //go:embed testdata/mantlebackup-template.yaml - testMantleBackupTemplate string - //go:embed testdata/mantlerestore-template.yaml - testMantleRestoreTemplate string - //go:embed testdata/mount-deploy-template.yaml - mountDeployTemplate string - //go:embed testdata/write-job-template.yaml - writeJobTemplate string - - kubectlPrefixPrimary = os.Getenv("KUBECTL_PRIMARY") - kubectlPrefixSecondary = os.Getenv("KUBECTL_SECONDARY") -) - -func execAtLocal(cmd string, input []byte, args ...string) ([]byte, []byte, error) { - var stdout, stderr bytes.Buffer - command := exec.Command(cmd, args...) - command.Stdout = &stdout - command.Stderr = &stderr - - if len(input) != 0 { - command.Stdin = bytes.NewReader(input) - } - - err := command.Run() - return stdout.Bytes(), stderr.Bytes(), err -} - -// input can be nil -func kubectl(clusterNo int, input []byte, args ...string) ([]byte, []byte, error) { - kubectlPrefix := "" - switch clusterNo { - case primaryK8sCluster: - kubectlPrefix = kubectlPrefixPrimary - case secondaryK8sCluster: - kubectlPrefix = kubectlPrefixSecondary - default: - panic(fmt.Sprintf("invalid clusterNo: %d", clusterNo)) - } - if len(kubectlPrefix) == 0 { - panic("Either KUBECTL_PRIMARY or KUBECTL_SECONDARY environment variable is not set") - } - fields := strings.Fields(kubectlPrefix) - fields = append(fields, args...) - return execAtLocal(fields[0], input, fields[1:]...) -} - -func checkDeploymentReady(clusterNo int, namespace, name string) error { - _, stderr, err := kubectl( - clusterNo, nil, - "-n", namespace, "wait", "--for=condition=Available", "deploy", name, "--timeout=1m", - ) - if err != nil { - return fmt.Errorf("kubectl wait deploy failed. stderr: %s, err: %w", string(stderr), err) - } - return nil -} - -func applyMantleBackupTemplate(clusterNo int, namespace, pvcName, backupName string) error { - manifest := fmt.Sprintf(testMantleBackupTemplate, backupName, backupName, namespace, pvcName) - _, _, err := kubectl(clusterNo, []byte(manifest), "apply", "-f", "-") - if err != nil { - return fmt.Errorf("kubectl apply mantlebackup failed. err: %w", err) - } - return nil -} - -func applyMantleRestoreTemplate(clusterNo int, namespace, restoreName, backupName string) error { - manifest := fmt.Sprintf(testMantleRestoreTemplate, restoreName, restoreName, namespace, backupName) - _, _, err := kubectl(clusterNo, []byte(manifest), "apply", "-f", "-") - if err != nil { - return fmt.Errorf("kubectl apply mantlerestore failed. err: %w", err) - } - return nil -} - -func applyPVCTemplate(clusterNo int, namespace, name string) error { - manifest := fmt.Sprintf(testPVCTemplate, name) - _, _, err := kubectl(clusterNo, []byte(manifest), "apply", "-n", namespace, "-f", "-") - if err != nil { - return fmt.Errorf("kubectl apply pvc failed. err: %w", err) - } - return nil -} - -func applyMountDeployTemplate(clusterNo int, namespace, name, pvcName string) error { - manifest := fmt.Sprintf(mountDeployTemplate, name, namespace, name, name, pvcName) - _, _, err := kubectl(clusterNo, []byte(manifest), "apply", "-n", namespace, "-f", "-") - if err != nil { - return fmt.Errorf("kubectl apply mount deploy failed. err: %w", err) - } - return nil -} - -func applyWriteJobTemplate(clusterNo int, namespace, name, pvcName string) error { - manifest := fmt.Sprintf(writeJobTemplate, name, namespace, pvcName) - _, _, err := kubectl(clusterNo, []byte(manifest), "apply", "-n", namespace, "-f", "-") - if err != nil { - return fmt.Errorf("kubectl apply write job failed. err: %w", err) - } - return nil -} - -func createNamespace(clusterNo int, name string) error { - _, _, err := kubectl(clusterNo, nil, "create", "ns", name) - if err != nil { - return fmt.Errorf("kubectl create ns failed. err: %w", err) - } - return nil -} - -func applyRBDPoolAndSCTemplate(clusterNo int, namespace string) error { //nolint:unparam - manifest := fmt.Sprintf( - testRBDPoolSCTemplate, namespace, - namespace, namespace, namespace, namespace) - _, _, err := kubectl(clusterNo, []byte(manifest), "apply", "-n", namespace, "-f", "-") - if err != nil { - return err - } - return nil -} - -func getObject[T any](clusterNo int, kind, namespace, name string) (*T, error) { - stdout, _, err := kubectl(clusterNo, nil, "get", kind, "-n", namespace, name, "-o", "json") - if err != nil { - return nil, err - } - - var obj T - if err := json.Unmarshal(stdout, &obj); err != nil { - return nil, err - } - - return &obj, nil -} - -func getMB(clusterNo int, namespace, name string) (*mantlev1.MantleBackup, error) { - return getObject[mantlev1.MantleBackup](clusterNo, "mantlebackup", namespace, name) -} - -func getPVC(clusterNo int, namespace, name string) (*corev1.PersistentVolumeClaim, error) { - return getObject[corev1.PersistentVolumeClaim](clusterNo, "pvc", namespace, name) -} - -func getMR(clusterNo int, namespace, name string) (*mantlev1.MantleRestore, error) { - return getObject[mantlev1.MantleRestore](clusterNo, "mantlerestore", namespace, name) -} - -func getDeploy(clusterNo int, namespace, name string) (*appsv1.Deployment, error) { - return getObject[appsv1.Deployment](clusterNo, "deploy", namespace, name) -} - -func getJob(clusterNo int, namespace, name string) (*batchv1.Job, error) { - return getObject[batchv1.Job](clusterNo, "job", namespace, name) -} - -func getObjectList[T any](clusterNo int, kind, namespace string) (*T, error) { - var stdout []byte - var err error - if namespace == "" { - stdout, _, err = kubectl(clusterNo, nil, "get", kind, "-o", "json") - } else { - stdout, _, err = kubectl(clusterNo, nil, "get", kind, "-n", namespace, "-o", "json") - } - if err != nil { - return nil, err - } - - var objList T - if err := json.Unmarshal(stdout, &objList); err != nil { - return nil, err - } - - return &objList, nil -} - -func changeClusterRole(clusterNo int, newRole string) error { - deployName := "mantle-controller" - deploy, err := getDeploy(clusterNo, cephClusterNamespace, deployName) - if err != nil { - return fmt.Errorf("failed to get mantle-controller deploy: %w", err) - } - - roleIndex := slices.IndexFunc( - deploy.Spec.Template.Spec.Containers[0].Args, - func(arg string) bool { return strings.HasPrefix(arg, "--role=") }, - ) - if roleIndex == -1 { - return errors.New("failed to find --role= argument") - } - - _, _, err = kubectl( - clusterNo, nil, "patch", "deploy", "-n", cephClusterNamespace, deployName, "--type=json", - fmt.Sprintf( - `-p=[{"op": "replace", "path": "/spec/template/spec/containers/0/args/%d", "value":"--role=%s"}]`, - roleIndex, - newRole, - ), - ) - if err != nil { - return fmt.Errorf("failed to patch mantle-controller deploy: %w", err) - } - - // Wait for the new controller to start - numRetries := 10 - for i := 0; i < numRetries; i++ { - stdout, _, err := kubectl(clusterNo, nil, "get", "pod", "-n", cephClusterNamespace, "-o", "json") - if err != nil { - return fmt.Errorf("failed to get pod: %w", err) - } - var pods corev1.PodList - err = json.Unmarshal(stdout, &pods) - if err != nil { - return fmt.Errorf("failed to unmarshal pod list: %w", err) - } - ready := true - for _, pod := range pods.Items { - if strings.HasPrefix(pod.GetName(), deployName) { - for _, container := range pod.Spec.Containers { - if !slices.Contains(container.Args, fmt.Sprintf("--role=%s", newRole)) { - ready = false - } - } - } - } - if ready { - break - } - time.Sleep(10 * time.Second) - } - - return nil -} - -type objectStorageClient struct { - cli *s3.Client - bucketName string -} - -func (c *objectStorageClient) listObjects(ctx context.Context) (*s3.ListObjectsV2Output, error) { - return c.cli.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ - Bucket: &c.bucketName, - }) -} - -func createObjectStorageClient(ctx context.Context) (*objectStorageClient, error) { - // Find the endpoint of the object storage from the command-line arguments for mantle-controller. - stdout, _, err := kubectl(primaryK8sCluster, nil, - "get", "deploy", "-n", cephClusterNamespace, "mantle-controller", "-o", "json") - if err != nil { - return nil, fmt.Errorf("failed to get deploy: %w", err) - } - var deploy appsv1.Deployment - if err := json.Unmarshal(stdout, &deploy); err != nil { - return nil, fmt.Errorf("failed to unmarshal deploy: %w", err) - } - args := deploy.Spec.Template.Spec.Containers[0].Args - endpointIndex := slices.IndexFunc(args, func(s string) bool { - return strings.HasPrefix(s, "--object-storage-endpoint=") - }) - if endpointIndex == -1 { - return nil, errors.New("failed to find object storage endpoint") - } - objectStorageEndpoint, _ := strings.CutPrefix(args[endpointIndex], "--object-storage-endpoint=") - - // Get the bucket name from the OBC. - stdout, _, err = kubectl(secondaryK8sCluster, nil, - "get", "obc", "-n", cephClusterNamespace, "export-data", "-o", "json") - if err != nil { - return nil, fmt.Errorf("failed to get obc: %w", err) - } - var obc struct { - Spec struct { - BucketName string `json:"bucketName"` - } `json:"spec"` - } - if err := json.Unmarshal(stdout, &obc); err != nil { - return nil, fmt.Errorf("failed to unmarshal obc: %w", err) - } - - // Get the credentials from the Secret. - stdout, _, err = kubectl(secondaryK8sCluster, nil, - "get", "secret", "-n", cephClusterNamespace, "export-data", "-o", "json") - if err != nil { - return nil, fmt.Errorf("failed to get export-data secret: %w", err) - } - var secret corev1.Secret - if err := json.Unmarshal(stdout, &secret); err != nil { - return nil, fmt.Errorf("failed to unmarshal secret: %w", err) - } - awsAccessKeyID := secret.Data["AWS_ACCESS_KEY_ID"] - awsSecretAccessKey := secret.Data["AWS_SECRET_ACCESS_KEY"] - - // Construct a S3 client. - sdkConfig, err := config.LoadDefaultConfig( - ctx, - config.WithRegion("ceph"), - config.WithCredentialsProvider( - aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) { - return aws.Credentials{ - AccessKeyID: string(awsAccessKeyID), - SecretAccessKey: string(awsSecretAccessKey), - }, nil - }), - ), - ) - if err != nil { - return nil, fmt.Errorf("failed to load default config: %w", err) - } - s3Client := s3.NewFromConfig(sdkConfig, func(o *s3.Options) { - o.BaseEndpoint = &objectStorageEndpoint - o.UsePathStyle = true - }) - - return &objectStorageClient{cli: s3Client, bucketName: obc.Spec.BucketName}, nil -} - -// IsJobConditionTrue returns true when the conditionType is present and set to -// `metav1.ConditionTrue`. Otherwise, it returns false. Note that we can't use -// meta.IsStatusConditionTrue because it doesn't accept []JobCondition. -func IsJobConditionTrue(conditions []batchv1.JobCondition, conditionType batchv1.JobConditionType) bool { - for _, cond := range conditions { - if cond.Type == conditionType && cond.Status == corev1.ConditionTrue { - return true - } - } - return false -} diff --git a/test/e2e/testdata/values-mantle-primary-template.yaml b/test/e2e/testdata/values-mantle-primary-template.yaml index 6da05416..f487a52b 100644 --- a/test/e2e/testdata/values-mantle-primary-template.yaml +++ b/test/e2e/testdata/values-mantle-primary-template.yaml @@ -9,3 +9,6 @@ controller: #httpProxy: http://host.minikube.internal:8899 #httpsProxy: http://host.minikube.internal:8899 #noProxy: localhost,127.0.0.1,10.96.0.0/12 + env: + - name: REQUEUE_RECONCILIATION_AFTER + value: "1s" diff --git a/test/e2e/testdata/values-mantle-secondary-template.yaml b/test/e2e/testdata/values-mantle-secondary-template.yaml index cc2c857d..ccfab6e9 100644 --- a/test/e2e/testdata/values-mantle-secondary-template.yaml +++ b/test/e2e/testdata/values-mantle-secondary-template.yaml @@ -7,6 +7,9 @@ controller: objectStorageEndpoint: {OBJECT_STORAGE_ENDPOINT} envSecret: export-data gcInterval: 1s + env: + - name: REQUEUE_RECONCILIATION_IMMEDIATELY + value: "1" secondaryService: type: NodePort