diff --git a/PROJECT b/PROJECT index c391e511..34e9ae97 100644 --- a/PROJECT +++ b/PROJECT @@ -35,4 +35,8 @@ resources: kind: MantleBackupConfig path: github.com/cybozu-go/mantle/api/v1 version: v1 +- controller: true + domain: cybozu.io + kind: PersistentVolume + version: v1 version: "3" diff --git a/charts/mantle-cluster-wide/templates/clusterrole.yaml b/charts/mantle-cluster-wide/templates/clusterrole.yaml index b11e0862..087cb866 100644 --- a/charts/mantle-cluster-wide/templates/clusterrole.yaml +++ b/charts/mantle-cluster-wide/templates/clusterrole.yaml @@ -27,6 +27,20 @@ rules: - patch - update - watch + - apiGroups: + - "" + resources: + - persistentvolumes/finalizers + verbs: + - update + - apiGroups: + - "" + resources: + - persistentvolumes/status + verbs: + - get + - patch + - update - apiGroups: - batch resources: diff --git a/charts/mantle/templates/deployment.yaml b/charts/mantle/templates/deployment.yaml index bb5fc49a..ef4eb440 100644 --- a/charts/mantle/templates/deployment.yaml +++ b/charts/mantle/templates/deployment.yaml @@ -85,6 +85,9 @@ spec: {{- with .Values.controller.exportDataStorageClass }} - --export-data-storage-class={{ . }} {{- end }} + {{- with .Values.controller.gcInterval }} + - --gc-interval={{ . }} + {{- end }} env: - name: POD_NAME valueFrom: diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 572631cc..f9837be9 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -56,6 +56,7 @@ var ( objectStorageEndpoint string caCertConfigMapSrc string caCertKeySrc string + gcInterval string scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") @@ -95,6 +96,8 @@ func init() { flags.StringVar(&caCertKeySrc, "ca-cert-key", "ca.crt", "The key of the ConfigMap specified by --ca-cert-config-map that contains the intermediate certificate. "+ "The default value is ca.crt. This option is just ignored if --ca-cert-configmap isn't specified.") + flags.StringVar(&gcInterval, "gc-interval", "1h", + "The time period between each garbage collection for orphaned resources.") goflags := flag.NewFlagSet("goflags", flag.ExitOnError) zapOpts.Development = true @@ -151,6 +154,17 @@ func setupReconcilers(mgr manager.Manager, primarySettings *controller.PrimarySe caCertConfigMap = &caCertConfigMapSrc } + parsedGCInterval, err := time.ParseDuration(gcInterval) + if err != nil { + setupLog.Error(err, "faield to parse gc interval", "gcInterval", gcInterval) + return err + } + if parsedGCInterval < 1*time.Second { + err := fmt.Errorf("the specified gc interval is too short: %s", parsedGCInterval.String()) + setupLog.Error(err, "failed to validate gc interval", "gcInterval", gcInterval) + return err + } + backupReconciler := controller.NewMantleBackupReconciler( mgr.GetClient(), mgr.GetScheme(), @@ -173,7 +187,6 @@ func setupReconcilers(mgr manager.Manager, primarySettings *controller.PrimarySe restoreReconciler := controller.NewMantleRestoreReconciler( mgr.GetClient(), - mgr.GetAPIReader(), mgr.GetScheme(), managedCephClusterID, role, @@ -193,8 +206,25 @@ func setupReconcilers(mgr manager.Manager, primarySettings *controller.PrimarySe setupLog.Error(err, "unable to create controller", "controller", "MantleBackupConfig") return err } + + if err := controller.NewPersistentVolumeReconciler( + mgr.GetClient(), + mgr.GetScheme(), + managedCephClusterID, + ).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "PersistentVolumeReconciler") + return err + } + //+kubebuilder:scaffold:builder + if err := mgr.Add( + controller.NewGarbageCollectorRunner(mgr.GetClient(), parsedGCInterval, managedCephClusterID), + ); err != nil { + setupLog.Error(err, "unable to create runner", "runner", "GarbageCollectorRunner") + return err + } + return nil } diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 072e30a3..b52c6a37 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -27,6 +27,20 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - persistentvolumes/finalizers + verbs: + - update +- apiGroups: + - "" + resources: + - persistentvolumes/status + verbs: + - get + - patch + - update - apiGroups: - batch resources: diff --git a/internal/controller/garbage_collector_runner.go b/internal/controller/garbage_collector_runner.go new file mode 100644 index 00000000..ae467442 --- /dev/null +++ b/internal/controller/garbage_collector_runner.go @@ -0,0 +1,135 @@ +package controller + +import ( + "context" + "fmt" + "time" + + mantlev1 "github.com/cybozu-go/mantle/api/v1" + corev1 "k8s.io/api/core/v1" + aerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +type GarbageCollectorRunner struct { + client client.Client + interval time.Duration + managedCephClusterID string +} + +func NewGarbageCollectorRunner( + client client.Client, + interval time.Duration, + managedCephClusterID string, +) *GarbageCollectorRunner { + return &GarbageCollectorRunner{ + client: client, + interval: interval, + managedCephClusterID: managedCephClusterID, + } +} + +func (r *GarbageCollectorRunner) Start(ctx context.Context) error { + logger := log.FromContext(ctx) + + for { + ctxSleep, cancelSleep := context.WithTimeout(ctx, r.interval) + <-ctxSleep.Done() + cancelSleep() + if ctx.Err() != nil { + break + } + + logger.Info("garbage collection started") + + if err := r.deleteOrphanedPVs(ctx); err != nil { + logger.Error(err, "failed to delete orphaned PVs", "error", err) + } + + logger.Info("garbage collection finished") + } + + return nil +} + +func (r *GarbageCollectorRunner) deleteOrphanedPVs(ctx context.Context) error { + logger := log.FromContext(ctx) + + requirement, err := labels.NewRequirement(labelRestoringPVKey, selection.Exists, []string{}) + if err != nil { + return fmt.Errorf("failed to create a new labels requirement: %w", err) + } + selector := labels.ValidatedSetSelector{}.Add(*requirement) + + var pvList corev1.PersistentVolumeList + if err := r.client.List(ctx, &pvList, &client.ListOptions{ + LabelSelector: selector, + }); err != nil { + return fmt.Errorf("failed to list PVs: %w", err) + } + + for _, pv := range pvList.Items { + shouldDelete, err := r.isMantleRestoreAlreadyDeleted(ctx, &pv) + if err != nil { + return fmt.Errorf("failed to check if a PV should be deleted: %w", err) + } + if !shouldDelete { + continue + } + if err := r.client.Delete(ctx, &pv, &client.DeleteOptions{ + Preconditions: &metav1.Preconditions{UID: &pv.ObjectMeta.UID, ResourceVersion: &pv.ObjectMeta.ResourceVersion}, + }); err != nil { + return fmt.Errorf("failed to delete PV: %w", err) + } + logger.Info("an orphaned PV is removed", "name", pv.GetName()) + } + + return nil +} + +func (r *GarbageCollectorRunner) isMantleRestoreAlreadyDeleted(ctx context.Context, pv *corev1.PersistentVolume) (bool, error) { + restoreUID, ok := pv.GetAnnotations()[PVAnnotationRestoredBy] + if !ok { + return false, fmt.Errorf("failed to find annotation: %s: %s", PVAnnotationRestoredBy, pv.GetName()) + } + restoreName, ok := pv.GetAnnotations()[PVAnnotationRestoredByName] + if !ok { + return false, fmt.Errorf("failed to find annotation: %s: %s", PVAnnotationRestoredByName, pv.GetName()) + } + restoreNamespace, ok := pv.GetAnnotations()[PVAnnotationRestoredByNamespace] + if !ok { + return false, fmt.Errorf("failed to find annotation: %s: %s", PVAnnotationRestoredByNamespace, pv.GetName()) + } + + clusterID, ok := pv.Spec.CSI.VolumeAttributes["clusterID"] + if !ok { + return false, fmt.Errorf("failed to find cluster ID: %s", pv.GetName()) + } + if r.managedCephClusterID != clusterID { + return false, nil + } + + var restore mantlev1.MantleRestore + if err := r.client.Get( + ctx, + types.NamespacedName{Name: restoreName, Namespace: restoreNamespace}, + &restore, + ); err != nil { + if aerrors.IsNotFound(err) { + return true, nil + } else { + return false, fmt.Errorf("failed to get MantleRestore: %s: %w", pv.GetName(), err) + } + } + + if string(restore.GetUID()) != restoreUID { + return true, nil + } + + return false, nil +} diff --git a/internal/controller/garbage_collector_runner_test.go b/internal/controller/garbage_collector_runner_test.go new file mode 100644 index 00000000..51dbfcaf --- /dev/null +++ b/internal/controller/garbage_collector_runner_test.go @@ -0,0 +1,167 @@ +package controller + +import ( + "time" + + mantlev1 "github.com/cybozu-go/mantle/api/v1" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("garbage collector", func() { + var err error + + Context("isMantleRestoreAlreadyDeleted", func() { + doTest := + func( + ctx SpecContext, + existMR bool, + pvModifier func(pv *corev1.PersistentVolume), + expectError, expectDeleted bool, + ) { + runner := NewGarbageCollectorRunner(k8sClient, 1*time.Second, "rook-ceph") + ns := resMgr.CreateNamespace() + + var mr mantlev1.MantleRestore + mr.SetName("mr") + mr.SetNamespace(ns) + mr.Spec.Backup = "dummy" + if existMR { + err = k8sClient.Create(ctx, &mr) + Expect(err).NotTo(HaveOccurred()) + err = k8sClient.Get(ctx, types.NamespacedName{Name: mr.GetName(), Namespace: mr.GetNamespace()}, &mr) + Expect(err).NotTo(HaveOccurred()) + } + + var pv corev1.PersistentVolume + pv.SetAnnotations(map[string]string{ + PVAnnotationRestoredBy: string(mr.GetUID()), + PVAnnotationRestoredByName: "mr", + PVAnnotationRestoredByNamespace: ns, + }) + pv.Spec.CSI = &corev1.CSIPersistentVolumeSource{ + VolumeAttributes: map[string]string{"clusterID": "rook-ceph"}, + } + if pvModifier != nil { + pvModifier(&pv) + } + + deleted, err := runner.isMantleRestoreAlreadyDeleted(ctx, &pv) + if expectError { + Expect(err).To(HaveOccurred()) + } else { + Expect(err).NotTo(HaveOccurred()) + } + Expect(deleted).To(Equal(expectDeleted)) + } + + DescribeTable("isMantleRestoreAlreadyDeleted", + doTest, + Entry("MantleRestore exists", true, nil, false, false), + Entry("MantleRestore does NOT exist", false, nil, false, true), + Entry( + "MantleRestore exists, but the PV's annotation has unexpected MR's UID", + true, + func(pv *corev1.PersistentVolume) { + pv.Annotations[PVAnnotationRestoredBy] = "unexpected-uid" + }, + false, true, + ), + Entry( + "PV annotation of restored-by missing", + true, + func(pv *corev1.PersistentVolume) { + delete(pv.Annotations, PVAnnotationRestoredBy) + }, + true, false, + ), + Entry( + "PV annotation of restored-by-name missing", + true, + func(pv *corev1.PersistentVolume) { + delete(pv.Annotations, PVAnnotationRestoredByName) + }, + true, false, + ), + Entry( + "PV annotation of restored-by-namespace missing", + true, + func(pv *corev1.PersistentVolume) { + delete(pv.Annotations, PVAnnotationRestoredByNamespace) + }, + true, false, + ), + Entry( + "cluster ID does not match", + true, + func(pv *corev1.PersistentVolume) { + pv.Spec.CSI.VolumeAttributes["clusterID"] = "different-cluster-id" + }, + false, false, + ), + ) + }) + + Context("deleteOrphanedPVs", func() { + It("should remove only orphaned PVs", func(ctx SpecContext) { + runner := NewGarbageCollectorRunner(k8sClient, 1*time.Second, resMgr.ClusterID) + ns := resMgr.CreateNamespace() + + // Create MantleRestore mr1 + mr1 := mantlev1.MantleRestore{} + mr1.SetName("mr1") + mr1.SetNamespace(ns) + mr1.Spec.Backup = "dummy" + err = k8sClient.Create(ctx, &mr1) + Expect(err).NotTo(HaveOccurred()) + err = k8sClient.Get(ctx, types.NamespacedName{Name: mr1.GetName(), Namespace: mr1.GetNamespace()}, &mr1) + Expect(err).NotTo(HaveOccurred()) + + // Create PV pv1 referring to mr1 + pv1, _, err := resMgr.CreateUniquePVAndPVC(ctx, ns) + Expect(err).NotTo(HaveOccurred()) + pv1.SetLabels(map[string]string{ + labelRestoringPVKey: labelRestoringPVValue, + }) + pv1.SetAnnotations(map[string]string{ + PVAnnotationRestoredBy: string(mr1.GetUID()), + PVAnnotationRestoredByName: mr1.GetName(), + PVAnnotationRestoredByNamespace: mr1.GetNamespace(), + }) + err = k8sClient.Update(ctx, pv1) + Expect(err).NotTo(HaveOccurred()) + + // Create PV pv2 not referring to any MantleRestores + pv2, _, err := resMgr.CreateUniquePVAndPVC(ctx, ns) + Expect(err).NotTo(HaveOccurred()) + pv2.SetLabels(map[string]string{ + labelRestoringPVKey: labelRestoringPVValue, + }) + pv2.SetAnnotations(map[string]string{ + PVAnnotationRestoredBy: "non-existing-uid", + PVAnnotationRestoredByName: "non-existing-name", + PVAnnotationRestoredByNamespace: "non-existing-namespace", + }) + err = k8sClient.Update(ctx, pv2) + Expect(err).NotTo(HaveOccurred()) + + // No MantleRestore referring to pv2 exists, so pv2 is orphaned and should be removed. + + // Perform deleteOrphanedPVs. + err = runner.deleteOrphanedPVs(ctx) + Expect(err).NotTo(HaveOccurred()) + + // pv1 should NOT be deleted. + err = k8sClient.Get(ctx, types.NamespacedName{Name: pv1.GetName()}, pv1) + Expect(err).NotTo(HaveOccurred()) + Expect(pv1.DeletionTimestamp.IsZero()).To(BeTrue()) + + // pv2 should be deleted. + err = k8sClient.Get(ctx, types.NamespacedName{Name: pv2.GetName()}, pv2) + Expect(err).NotTo(HaveOccurred()) + Expect(pv2.DeletionTimestamp.IsZero()).To(BeFalse()) + }) + }) +}) diff --git a/internal/controller/mantlebackup_controller.go b/internal/controller/mantlebackup_controller.go index 0718d59d..0b2c7709 100644 --- a/internal/controller/mantlebackup_controller.go +++ b/internal/controller/mantlebackup_controller.go @@ -423,6 +423,10 @@ func (r *MantleBackupReconciler) reconcileAsPrimary(ctx context.Context, backup func (r *MantleBackupReconciler) reconcileAsSecondary(ctx context.Context, backup *mantlev1.MantleBackup) (ctrl.Result, error) { logger := log.FromContext(ctx) + if err := r.prepareObjectStorageClient(ctx); err != nil { + return ctrl.Result{}, err + } + if !isCreatedWhenMantleControllerWasSecondary(backup) { logger.Info( "skipping to reconcile the MantleBackup created by a different mantle-controller to prevent accidental data loss", @@ -1474,9 +1478,6 @@ func (r *MantleBackupReconciler) isExportDataAlreadyUploaded( ctx context.Context, target *mantlev1.MantleBackup, ) (ctrl.Result, error) { - if err := r.prepareObjectStorageClient(ctx); err != nil { - return ctrl.Result{}, err - } uploaded, err := r.objectStorageClient.Exists( ctx, makeObjectNameOfExportedData(target.GetName(), target.GetAnnotations()[annotRemoteUID]), diff --git a/internal/controller/mantlerestore_controller.go b/internal/controller/mantlerestore_controller.go index 53c05482..d4b52d08 100644 --- a/internal/controller/mantlerestore_controller.go +++ b/internal/controller/mantlerestore_controller.go @@ -22,7 +22,6 @@ import ( // MantleRestoreReconciler reconciles a MantleRestore object type MantleRestoreReconciler struct { client client.Client - reader client.Reader Scheme *runtime.Scheme managedCephClusterID string ceph ceph.CephCmd @@ -30,9 +29,14 @@ type MantleRestoreReconciler struct { } const ( - MantleRestoreFinalizerName = "mantlerestore.mantle.cybozu.io/finalizer" - PVAnnotationRestoredBy = "mantle.cybozu.io/restored-by" - PVCAnnotationRestoredBy = "mantle.cybozu.io/restored-by" + MantleRestoreFinalizerName = "mantlerestore.mantle.cybozu.io/finalizer" + RestoringPVFinalizerName = "mantle.cybozu.io/restoring-pv-finalizer" + PVAnnotationRestoredBy = "mantle.cybozu.io/restored-by" + PVAnnotationRestoredByName = "mantle.cybozu.io/restored-by-name" + PVAnnotationRestoredByNamespace = "mantle.cybozu.io/restored-by-namespace" + PVCAnnotationRestoredBy = "mantle.cybozu.io/restored-by" + labelRestoringPVKey = "mantle.cybozu.io/restoring-pv" + labelRestoringPVValue = "true" ) // +kubebuilder:rbac:groups=mantle.cybozu.io,resources=mantlerbackup,verbs=get;list;watch @@ -41,17 +45,16 @@ const ( // +kubebuilder:rbac:groups=mantle.cybozu.io,resources=mantlerestores/finalizers,verbs=update // +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=persistentvolumes/finalizers,verbs=update func NewMantleRestoreReconciler( client client.Client, - reader client.Reader, scheme *runtime.Scheme, managedCephClusterID, role string, ) *MantleRestoreReconciler { return &MantleRestoreReconciler{ client: client, - reader: reader, Scheme: scheme, managedCephClusterID: managedCephClusterID, ceph: ceph.NewCephCmd(), @@ -163,13 +166,13 @@ func (r *MantleRestoreReconciler) restore(ctx context.Context, restore *mantlev1 } // create a restore PV with the clone image - if err := r.createRestoringPV(ctx, restore, &backup); err != nil { + if err := r.createOrUpdateRestoringPV(ctx, restore, &backup); err != nil { logger.Error(err, "failed to create PV") return ctrl.Result{}, err } // create a restore PVC with the restore PV - if err := r.createRestoringPVC(ctx, restore, &backup); err != nil { + if err := r.createOrUpdateRestoringPVC(ctx, restore, &backup); err != nil { logger.Error(err, "failed to create PVC") return ctrl.Result{}, err } @@ -241,93 +244,93 @@ func (r *MantleRestoreReconciler) cloneImageFromBackup(ctx context.Context, rest return r.ceph.RBDClone(restore.Status.Pool, bkImage, backup.Name, r.restoringRBDImageName(restore), features) } -func (r *MantleRestoreReconciler) createRestoringPV(ctx context.Context, restore *mantlev1.MantleRestore, backup *mantlev1.MantleBackup) error { +func (r *MantleRestoreReconciler) createOrUpdateRestoringPV(ctx context.Context, restore *mantlev1.MantleRestore, backup *mantlev1.MantleBackup) error { pvName := r.restoringPVName(restore) restoredBy := string(restore.UID) - // check if the PV already exists - existingPV := corev1.PersistentVolume{} - if err := r.client.Get(ctx, client.ObjectKey{Name: pvName}, &existingPV); err != nil { - if !errors.IsNotFound(err) { - return fmt.Errorf("failed to get existing PV: %v", err) + var pv corev1.PersistentVolume + pv.SetName(pvName) + _, err := ctrl.CreateOrUpdate(ctx, r.client, &pv, func() error { + if !pv.GetDeletionTimestamp().IsZero() { + return fmt.Errorf("the restoring PV already began to be deleted: %s", pvName) } - } else if existingPV.Annotations[PVAnnotationRestoredBy] != restoredBy { - return fmt.Errorf("existing PV is having different MantleRestore UID: %s, %s", pvName, existingPV.Annotations[PVAnnotationRestoredBy]) - } else { - // PV already exists and restored by the same MantleRestore - return nil - } + if pv.Annotations == nil { + pv.Annotations = map[string]string{} + } + if annot, ok := pv.Annotations[PVAnnotationRestoredBy]; ok && annot != restoredBy { + return fmt.Errorf("the existing PV is having different MantleRestore UID: %s, %s", + pvName, pv.Annotations[PVAnnotationRestoredBy]) + } + pv.Annotations[PVAnnotationRestoredBy] = restoredBy + pv.Annotations[PVAnnotationRestoredByName] = restore.GetName() + pv.Annotations[PVAnnotationRestoredByNamespace] = restore.GetNamespace() + + // get the source PV from the backup + srcPV := corev1.PersistentVolume{} + if err := json.Unmarshal([]byte(backup.Status.PVManifest), &srcPV); err != nil { + return fmt.Errorf("failed to unmarshal PV manifest: %w", err) + } - // get the source PV from the backup - srcPV := corev1.PersistentVolume{} - err := json.Unmarshal([]byte(backup.Status.PVManifest), &srcPV) - if err != nil { - return fmt.Errorf("failed to unmarshal PV manifest: %v", err) - } + controllerutil.AddFinalizer(&pv, RestoringPVFinalizerName) - // create a new restoring PV corresponding to a restoring RBD image - newPV := corev1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: pvName, - Annotations: map[string]string{ - PVAnnotationRestoredBy: restoredBy, - }, - }, - Spec: *srcPV.Spec.DeepCopy(), - } - newPV.Spec.ClaimRef = nil - newPV.Spec.CSI.VolumeAttributes = map[string]string{ - "clusterID": srcPV.Spec.CSI.VolumeAttributes["clusterID"], - "pool": srcPV.Spec.CSI.VolumeAttributes["pool"], - "staticVolume": "true", - "imageFeatures": srcPV.Spec.CSI.VolumeAttributes["imageFeatures"] + ",deep-flatten", - } - newPV.Spec.CSI.VolumeHandle = r.restoringRBDImageName(restore) - newPV.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain + if pv.Labels == nil { + pv.Labels = map[string]string{} + } + pv.Labels[labelRestoringPVKey] = labelRestoringPVValue + + pv.Spec = *srcPV.Spec.DeepCopy() + pv.Spec.ClaimRef = nil + pv.Spec.CSI.VolumeAttributes = map[string]string{ + "clusterID": srcPV.Spec.CSI.VolumeAttributes["clusterID"], + "pool": srcPV.Spec.CSI.VolumeAttributes["pool"], + "staticVolume": "true", + "imageFeatures": srcPV.Spec.CSI.VolumeAttributes["imageFeatures"] + ",deep-flatten", + } + pv.Spec.CSI.VolumeHandle = r.restoringRBDImageName(restore) + pv.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain - return r.client.Create(ctx, &newPV) + return nil + }) + + return err } -func (r *MantleRestoreReconciler) createRestoringPVC(ctx context.Context, restore *mantlev1.MantleRestore, backup *mantlev1.MantleBackup) error { +func (r *MantleRestoreReconciler) createOrUpdateRestoringPVC(ctx context.Context, restore *mantlev1.MantleRestore, backup *mantlev1.MantleBackup) error { pvcName := restore.Name pvcNamespace := restore.Namespace restoredBy := string(restore.UID) - // check if the PVC already exists - existingPVC := corev1.PersistentVolumeClaim{} - if err := r.client.Get(ctx, client.ObjectKey{Name: pvcName, Namespace: pvcNamespace}, &existingPVC); err != nil { - if !errors.IsNotFound(err) { - return fmt.Errorf("failed to get existing PVC: %v", err) + var pvc corev1.PersistentVolumeClaim + pvc.SetName(pvcName) + pvc.SetNamespace(pvcNamespace) + _, err := ctrl.CreateOrUpdate(ctx, r.client, &pvc, func() error { + if pvc.Annotations == nil { + pvc.Annotations = map[string]string{} } + if annot, ok := pvc.Annotations[PVCAnnotationRestoredBy]; ok && annot != restoredBy { + return fmt.Errorf("the existing PVC is having different MantleRestore UID: %s, %s", + pvcName, pvc.Annotations[PVCAnnotationRestoredBy]) + } + pvc.Annotations[PVCAnnotationRestoredBy] = restoredBy - } else if existingPVC.Annotations[PVCAnnotationRestoredBy] != restoredBy { - return fmt.Errorf("existing PVC is having different MantleRestore UID: %s, %s", pvcName, existingPVC.Annotations[PVCAnnotationRestoredBy]) - } else { - // PVC already exists and restored by the same MantleRestore - return nil - } + // get the source PVC from the backup + srcPVC := corev1.PersistentVolumeClaim{} + if err := json.Unmarshal([]byte(backup.Status.PVCManifest), &srcPVC); err != nil { + return fmt.Errorf("failed to unmarshal PVC manifest: %w", err) + } - // get the source PVC from the backup - srcPVC := corev1.PersistentVolumeClaim{} - err := json.Unmarshal([]byte(backup.Status.PVCManifest), &srcPVC) - if err != nil { - return fmt.Errorf("failed to unmarshal PVC manifest: %v", err) - } + pvc.Spec = *srcPVC.Spec.DeepCopy() + pvc.Spec.VolumeName = r.restoringPVName(restore) - newPVC := corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: pvcName, - Namespace: pvcNamespace, - Annotations: map[string]string{ - PVCAnnotationRestoredBy: restoredBy, - }, - }, - Spec: *srcPVC.Spec.DeepCopy(), - } - newPVC.Spec.VolumeName = r.restoringPVName(restore) + if err := controllerutil.SetControllerReference(restore, &pvc, r.Scheme); err != nil { + return fmt.Errorf("failed to set controller reference: %w", err) + } - return r.client.Create(ctx, &newPVC) + return nil + }) + + return err } func (r *MantleRestoreReconciler) cleanup(ctx context.Context, restore *mantlev1.MantleRestore) (ctrl.Result, error) { @@ -352,12 +355,6 @@ func (r *MantleRestoreReconciler) cleanup(ctx context.Context, restore *mantlev1 return ctrl.Result{}, err } - // delete the clone image - if err := r.removeRBDImage(ctx, restore); err != nil { - logger.Error(err, "failed to remove image") - return ctrl.Result{}, err - } - // remove the finalizer controllerutil.RemoveFinalizer(restore, MantleRestoreFinalizerName) err = r.client.Update(ctx, restore) @@ -370,13 +367,11 @@ func (r *MantleRestoreReconciler) cleanup(ctx context.Context, restore *mantlev1 } // deleteRestoringPVC deletes the restoring PVC. -// To delete RBD image, it returns an error if it still exists to ensure no one uses the PVC. -// Note: it must use reader rather than client to check PVC existence to avoid oversighting a PVC not in the cache. func (r *MantleRestoreReconciler) deleteRestoringPVC(ctx context.Context, restore *mantlev1.MantleRestore) error { pvcName := restore.Name pvcNamespace := restore.Namespace pvc := corev1.PersistentVolumeClaim{} - if err := r.reader.Get(ctx, client.ObjectKey{Name: pvcName, Namespace: pvcNamespace}, &pvc); err != nil { + if err := r.client.Get(ctx, client.ObjectKey{Name: pvcName, Namespace: pvcNamespace}, &pvc); err != nil { if errors.IsNotFound(err) { return nil } @@ -391,23 +386,17 @@ func (r *MantleRestoreReconciler) deleteRestoringPVC(ctx context.Context, restor return fmt.Errorf("failed to delete PVC: %v", err) } - if err := r.reader.Get(ctx, client.ObjectKey{Name: pvcName, Namespace: pvcNamespace}, &pvc); err != nil { - if errors.IsNotFound(err) { - return nil - } - return fmt.Errorf("failed to get PVC: %v", err) - } else { - return fmt.Errorf("PVC still exists: %s", pvcName) - } + return nil } // deleteRestoringPV deletes the restoring PV. -// To delete RBD image, it returns an error if it still exists to ensure no one uses the PV. -// Note: it must use reader rather than client to check PVC existence to avoid oversighting a PV not in the cache. func (r *MantleRestoreReconciler) deleteRestoringPV(ctx context.Context, restore *mantlev1.MantleRestore) error { pv := corev1.PersistentVolume{} - if err := r.reader.Get(ctx, client.ObjectKey{Name: r.restoringPVName(restore)}, &pv); err != nil { + if err := r.client.Get(ctx, client.ObjectKey{Name: r.restoringPVName(restore)}, &pv); err != nil { if errors.IsNotFound(err) { + // NOTE: Since the cache of the client may be stale, we may look + // over some PVs that should be removed here. Such PVs will be + // removed by GarbageCollectorRunner. return nil } return fmt.Errorf("failed to get PV: %v", err) @@ -421,31 +410,7 @@ func (r *MantleRestoreReconciler) deleteRestoringPV(ctx context.Context, restore return fmt.Errorf("failed to delete PV: %v", err) } - if err := r.reader.Get(ctx, client.ObjectKey{Name: r.restoringPVName(restore)}, &pv); err != nil { - if errors.IsNotFound(err) { - return nil - } - return fmt.Errorf("failed to get PV: %v", err) - } else { - return fmt.Errorf("PV still exists: %s", pv.Name) - } -} - -func (r *MantleRestoreReconciler) removeRBDImage(ctx context.Context, restore *mantlev1.MantleRestore) error { - logger := log.FromContext(ctx) - image := r.restoringRBDImageName(restore) - pool := restore.Status.Pool - logger.Info("removing image", "pool", pool, "image", image) - images, err := r.ceph.RBDLs(pool) - if err != nil { - return fmt.Errorf("failed to list RBD images: %v", err) - } - - if !slices.Contains(images, image) { - return nil - } - - return r.ceph.RBDRm(pool, image) + return nil } // SetupWithManager sets up the controller with the Manager. diff --git a/internal/controller/mantlerestore_controller_e2e.go b/internal/controller/mantlerestore_controller_e2e.go index ab87062d..663b1eef 100644 --- a/internal/controller/mantlerestore_controller_e2e.go +++ b/internal/controller/mantlerestore_controller_e2e.go @@ -25,7 +25,3 @@ func NewMantleRestoreReconcilerE2E(managedCephClusterID, toolsNamespace string) func (r *MantleRestoreReconcilerE2E) CloneImageFromBackup(ctx context.Context, restore *mantlev1.MantleRestore, backup *mantlev1.MantleBackup) error { return r.cloneImageFromBackup(ctx, restore, backup) } - -func (r *MantleRestoreReconcilerE2E) RemoveRBDImage(ctx context.Context, restore *mantlev1.MantleRestore) error { - return r.removeRBDImage(ctx, restore) -} diff --git a/internal/controller/mantlerestore_controller_test.go b/internal/controller/mantlerestore_controller_test.go index 44a727b6..8d73063f 100644 --- a/internal/controller/mantlerestore_controller_test.go +++ b/internal/controller/mantlerestore_controller_test.go @@ -16,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) type mantleRestoreControllerUnitTest struct { @@ -66,7 +67,6 @@ func (test *mantleRestoreControllerUnitTest) setupEnv() { // just allocate the reconciler, and does not start it. test.reconciler = NewMantleRestoreReconciler( test.mgrUtil.GetManager().GetClient(), - test.mgrUtil.GetManager().GetAPIReader(), test.mgrUtil.GetManager().GetScheme(), resMgr.ClusterID, RoleStandalone, @@ -144,7 +144,7 @@ func (test *mantleRestoreControllerUnitTest) testCreateRestoringPV() { }) It("should create a correct PV", func(ctx SpecContext) { - err := test.reconciler.createRestoringPV(ctx, restore, test.backup) + err := test.reconciler.createOrUpdateRestoringPV(ctx, restore, test.backup) Expect(err).NotTo(HaveOccurred()) err = k8sClient.Get(ctx, client.ObjectKey{Name: fmt.Sprintf("mr-%s-%s", test.tenantNamespace, restore.Name)}, &pv1) @@ -168,7 +168,7 @@ func (test *mantleRestoreControllerUnitTest) testCreateRestoringPV() { }) It("should skip creating a PV if it already exists", func(ctx SpecContext) { - err := test.reconciler.createRestoringPV(ctx, restore, test.backup) + err := test.reconciler.createOrUpdateRestoringPV(ctx, restore, test.backup) Expect(err).NotTo(HaveOccurred()) By("PV should not be updated") @@ -187,7 +187,7 @@ func (test *mantleRestoreControllerUnitTest) testCreateRestoringPV() { restoreDifferent := restore.DeepCopy() restoreDifferent.UID = types.UID(util.GetUniqueName("uid-")) - err := test.reconciler.createRestoringPV(ctx, restoreDifferent, test.backup) + err := test.reconciler.createOrUpdateRestoringPV(ctx, restoreDifferent, test.backup) Expect(err).To(HaveOccurred()) By("PV should not be updated") @@ -212,7 +212,7 @@ func (test *mantleRestoreControllerUnitTest) testCreateRestoringPVC() { }) It("should create a correct PVC", func(ctx SpecContext) { - err := test.reconciler.createRestoringPVC(ctx, restore, test.backup) + err := test.reconciler.createOrUpdateRestoringPVC(ctx, restore, test.backup) Expect(err).NotTo(HaveOccurred()) err = k8sClient.Get(ctx, client.ObjectKey{Name: restore.Name, Namespace: test.tenantNamespace}, &pvc1) @@ -225,10 +225,14 @@ func (test *mantleRestoreControllerUnitTest) testCreateRestoringPVC() { Expect(pvc1.Spec.StorageClassName).To(Equal(test.srcPVC.Spec.StorageClassName)) Expect(pvc1.Spec.VolumeMode).To(Equal(test.srcPVC.Spec.VolumeMode)) Expect(pvc1.Spec.VolumeName).To(Equal(fmt.Sprintf("mr-%s-%s", test.tenantNamespace, restore.Name))) + Expect(controllerutil.HasControllerReference(&pvc1)).To(BeTrue()) + Expect(*pvc1.GetObjectMeta().GetOwnerReferences()[0].Controller).To(BeTrue()) + Expect(pvc1.GetObjectMeta().GetOwnerReferences()[0].Kind).To(Equal("MantleRestore")) + Expect(pvc1.GetObjectMeta().GetOwnerReferences()[0].UID).To(Equal(restore.GetUID())) }) It("should skip creating a PVC if it already exists", func(ctx SpecContext) { - err := test.reconciler.createRestoringPVC(ctx, restore, test.backup) + err := test.reconciler.createOrUpdateRestoringPVC(ctx, restore, test.backup) Expect(err).NotTo(HaveOccurred()) By("PVC should not be updated") @@ -247,7 +251,7 @@ func (test *mantleRestoreControllerUnitTest) testCreateRestoringPVC() { restoreDifferent := restore.DeepCopy() restoreDifferent.UID = types.UID(util.GetUniqueName("uid-")) - err := test.reconciler.createRestoringPVC(ctx, restoreDifferent, test.backup) + err := test.reconciler.createOrUpdateRestoringPVC(ctx, restoreDifferent, test.backup) Expect(err).To(HaveOccurred()) By("PVC should not be updated") @@ -273,22 +277,24 @@ func (test *mantleRestoreControllerUnitTest) testDeleteRestoringPVC() { It("should delete the PVC", func(ctx SpecContext) { var pvc corev1.PersistentVolumeClaim - err := test.reconciler.createRestoringPVC(ctx, restore, test.backup) + err := test.reconciler.createOrUpdateRestoringPVC(ctx, restore, test.backup) Expect(err).NotTo(HaveOccurred()) - // remove pvc-protection finalizer from PVC to allow deletion + err = test.reconciler.deleteRestoringPVC(ctx, restore) + Expect(err).NotTo(HaveOccurred()) + + // remove finalizers from PVC to allow deletion err = k8sClient.Get(ctx, client.ObjectKey{Name: restore.Name, Namespace: test.tenantNamespace}, &pvc) Expect(err).NotTo(HaveOccurred()) pvc.Finalizers = nil err = k8sClient.Update(ctx, &pvc) Expect(err).NotTo(HaveOccurred()) - err = test.reconciler.deleteRestoringPVC(ctx, restore) - Expect(err).NotTo(HaveOccurred()) - - err = k8sClient.Get(ctx, client.ObjectKey{Name: restore.Name, Namespace: test.tenantNamespace}, &pvc) - Expect(err).To(HaveOccurred()) - Expect(errors.IsNotFound(err)).To(BeTrue()) + Eventually(ctx, func(g Gomega) { + err = k8sClient.Get(ctx, client.ObjectKey{Name: restore.Name, Namespace: test.tenantNamespace}, &pvc) + g.Expect(err).To(HaveOccurred()) + g.Expect(errors.IsNotFound(err)).To(BeTrue()) + }).Should(Succeed()) }) It("should skip deleting the PVC if it does not exist", func(ctx SpecContext) { @@ -301,31 +307,23 @@ func (test *mantleRestoreControllerUnitTest) testDeleteRestoringPVC() { restoreDifferent := restore.DeepCopy() restoreDifferent.UID = types.UID(util.GetUniqueName("uid-")) - err := test.reconciler.createRestoringPVC(ctx, restore, test.backup) + err := test.reconciler.createOrUpdateRestoringPVC(ctx, restore, test.backup) Expect(err).NotTo(HaveOccurred()) - // remove pvc-protection finalizer from PVC to allow deletion + err = test.reconciler.deleteRestoringPVC(ctx, restoreDifferent) + Expect(err).To(HaveOccurred()) + + // remove finalizers from PVC to allow deletion err = k8sClient.Get(ctx, client.ObjectKey{Name: restore.Name, Namespace: test.tenantNamespace}, &pvc) Expect(err).NotTo(HaveOccurred()) pvc.Finalizers = nil err = k8sClient.Update(ctx, &pvc) Expect(err).NotTo(HaveOccurred()) - err = test.reconciler.deleteRestoringPVC(ctx, restoreDifferent) - Expect(err).To(HaveOccurred()) - // cleanup err = test.reconciler.deleteRestoringPVC(ctx, restore) Expect(err).NotTo(HaveOccurred()) }) - - It("should return an error, if the PVC having finalizer", func(ctx SpecContext) { - err := test.reconciler.createRestoringPVC(ctx, restore, test.backup) - Expect(err).NotTo(HaveOccurred()) - - err = test.reconciler.deleteRestoringPVC(ctx, restore) - Expect(err).To(HaveOccurred()) - }) } func (test *mantleRestoreControllerUnitTest) testDeleteRestoringPV() { @@ -338,19 +336,19 @@ func (test *mantleRestoreControllerUnitTest) testDeleteRestoringPV() { It("should delete the PV", func(ctx SpecContext) { var pv corev1.PersistentVolume - err := test.reconciler.createRestoringPV(ctx, restore, test.backup) + err := test.reconciler.createOrUpdateRestoringPV(ctx, restore, test.backup) Expect(err).NotTo(HaveOccurred()) - // remove pv-protection finalizer from PV to allow deletion + err = test.reconciler.deleteRestoringPV(ctx, restore) + Expect(err).NotTo(HaveOccurred()) + + // remove finalizers from PV to allow deletion err = k8sClient.Get(ctx, client.ObjectKey{Name: test.reconciler.restoringPVName(restore)}, &pv) Expect(err).NotTo(HaveOccurred()) pv.Finalizers = nil err = k8sClient.Update(ctx, &pv) Expect(err).NotTo(HaveOccurred()) - err = test.reconciler.deleteRestoringPV(ctx, restore) - Expect(err).NotTo(HaveOccurred()) - err = k8sClient.Get(ctx, client.ObjectKey{Name: test.reconciler.restoringPVName(restore)}, &pv) Expect(err).To(HaveOccurred()) Expect(errors.IsNotFound(err)).To(BeTrue()) @@ -366,31 +364,23 @@ func (test *mantleRestoreControllerUnitTest) testDeleteRestoringPV() { restoreDifferent := restore.DeepCopy() restoreDifferent.UID = types.UID(util.GetUniqueName("uid-")) - err := test.reconciler.createRestoringPV(ctx, restore, test.backup) + err := test.reconciler.createOrUpdateRestoringPV(ctx, restore, test.backup) Expect(err).NotTo(HaveOccurred()) - // remove pv-protection finalizer from PV to allow deletion + err = test.reconciler.deleteRestoringPV(ctx, restoreDifferent) + Expect(err).To(HaveOccurred()) + + // remove finalizers from PV to allow deletion err = k8sClient.Get(ctx, client.ObjectKey{Name: test.reconciler.restoringPVName(restore)}, &pv) Expect(err).NotTo(HaveOccurred()) pv.Finalizers = nil err = k8sClient.Update(ctx, &pv) Expect(err).NotTo(HaveOccurred()) - err = test.reconciler.deleteRestoringPV(ctx, restoreDifferent) - Expect(err).To(HaveOccurred()) - // cleanup err = test.reconciler.deleteRestoringPV(ctx, restore) Expect(err).NotTo(HaveOccurred()) }) - - It("should return an error, if the PV having finalizer", func(ctx SpecContext) { - err := test.reconciler.createRestoringPV(ctx, restore, test.backup) - Expect(err).NotTo(HaveOccurred()) - - err = test.reconciler.deleteRestoringPV(ctx, restore) - Expect(err).To(HaveOccurred()) - }) } // helper function to get MantleRestore object diff --git a/internal/controller/persistentvolume_controller.go b/internal/controller/persistentvolume_controller.go new file mode 100644 index 00000000..6c9d28a5 --- /dev/null +++ b/internal/controller/persistentvolume_controller.go @@ -0,0 +1,143 @@ +package controller + +import ( + "context" + "errors" + "fmt" + "slices" + + "github.com/cybozu-go/mantle/internal/ceph" + corev1 "k8s.io/api/core/v1" + aerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// PersistentVolumeReconciler reconciles a PersistentVolume object +type PersistentVolumeReconciler struct { + client client.Client + Scheme *runtime.Scheme + ceph ceph.CephCmd + managedCephClusterID string +} + +// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=persistentvolumes/status,verbs=get;update;patch +// +kubebuilder:rbac:groups="",resources=persistentvolumes/finalizers,verbs=update + +func NewPersistentVolumeReconciler( + client client.Client, + scheme *runtime.Scheme, + managedCephClusterID string, +) *PersistentVolumeReconciler { + return &PersistentVolumeReconciler{ + client: client, + Scheme: scheme, + ceph: ceph.NewCephCmd(), + managedCephClusterID: managedCephClusterID, + } +} + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the PersistentVolume object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.3/pkg/reconcile +func (r *PersistentVolumeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + // Get the PV being reconciled. + var pv corev1.PersistentVolume + if err := r.client.Get(ctx, req.NamespacedName, &pv); err != nil { + if aerrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("failed to get PersistentVolume: %w", err) + } + + // Check if the PV is managed by the target Ceph cluster. + clusterID, err := getCephClusterIDFromSCName(ctx, r.client, pv.Spec.StorageClassName) + if err != nil { + if errors.Is(err, errEmptyClusterID) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + if clusterID != r.managedCephClusterID { + logger.Info("PV is not provisioned by the target Ceph cluster", "pv", pv.Name, "clusterID", clusterID) + return ctrl.Result{}, nil + } + + // Make sure the PV has the finalizer. + if !controllerutil.ContainsFinalizer(&pv, RestoringPVFinalizerName) { + return ctrl.Result{}, nil + } + + // Make sure the PV has a deletionTimestamp. + if pv.GetDeletionTimestamp().IsZero() { + return ctrl.Result{}, nil + } + + // Wait until the PV's status becomes Released. + if pv.Status.Phase != corev1.VolumeReleased { + return ctrl.Result{Requeue: true}, nil + } + + // Delete the RBD clone image. + if err := r.removeRBDImage(ctx, &pv); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to remove RBD image: %s: %w", pv.Name, err) + } + + // Remove the finalizer of the PV. + controllerutil.RemoveFinalizer(&pv, RestoringPVFinalizerName) + if err := r.client.Update(ctx, &pv); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to remove finalizer from PersistentVolume: %s: %s: %w", RestoringPVFinalizerName, pv.Name, err) + } + + logger.Info("finalize PV successfully", "pvName", pv.Name) + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *PersistentVolumeReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.PersistentVolume{}). + WithEventFilter(predicate.Funcs{ + CreateFunc: func(event.CreateEvent) bool { return true }, + UpdateFunc: func(event.UpdateEvent) bool { return true }, + GenericFunc: func(event.GenericEvent) bool { return true }, + DeleteFunc: func(ev event.DeleteEvent) bool { + return !controllerutil.ContainsFinalizer(ev.Object, RestoringPVFinalizerName) + }, + }). + Complete(r) +} + +func (r *PersistentVolumeReconciler) removeRBDImage(ctx context.Context, pv *corev1.PersistentVolume) error { + logger := log.FromContext(ctx) + + image := pv.Spec.CSI.VolumeHandle + pool := pv.Spec.CSI.VolumeAttributes["pool"] + logger.Info("removing image", "pool", pool, "image", image) + + images, err := r.ceph.RBDLs(pool) + if err != nil { + return fmt.Errorf("failed to list RBD images: %v", err) + } + + if !slices.Contains(images, image) { + return nil + } + + return r.ceph.RBDRm(pool, image) +} diff --git a/internal/controller/persistentvolume_controller_e2e.go b/internal/controller/persistentvolume_controller_e2e.go new file mode 100644 index 00000000..61628f02 --- /dev/null +++ b/internal/controller/persistentvolume_controller_e2e.go @@ -0,0 +1,26 @@ +package controller + +import ( + "context" + + "github.com/cybozu-go/mantle/internal/ceph" + corev1 "k8s.io/api/core/v1" +) + +// PersistentVolumeReconcilerE2E is a wrapper of PersistentVolumeReconciler. +// This module is used to test removeRBDImage in e2e tests. +type PersistentVolumeReconcilerE2E struct { + PersistentVolumeReconciler +} + +func NewPersistentVolumeReconcilerE2E(toolsNamespace string) *PersistentVolumeReconcilerE2E { + return &PersistentVolumeReconcilerE2E{ + PersistentVolumeReconciler{ + ceph: ceph.NewCephCmdWithTools(toolsNamespace), + }, + } +} + +func (r *PersistentVolumeReconcilerE2E) RemoveRBDImage(ctx context.Context, pv *corev1.PersistentVolume) error { + return r.removeRBDImage(ctx, pv) +} diff --git a/internal/controller/util.go b/internal/controller/util.go index c7b90a7a..3fa2d75c 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -2,6 +2,8 @@ package controller import ( "context" + "errors" + "fmt" "strings" batchv1 "k8s.io/api/batch/v1" @@ -12,30 +14,46 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) -func getCephClusterIDFromPVC(ctx context.Context, k8sClient client.Client, pvc *corev1.PersistentVolumeClaim) (string, error) { - logger := log.FromContext(ctx) - storageClassName := pvc.Spec.StorageClassName - if storageClassName == nil { - logger.Info("not managed storage class", "namespace", pvc.Namespace, "pvc", pvc.Name) - return "", nil - } +var errEmptyClusterID error = errors.New("cluster ID is empty") + +func getCephClusterIDFromSCName(ctx context.Context, k8sClient client.Client, storageClassName string) (string, error) { var storageClass storagev1.StorageClass - err := k8sClient.Get(ctx, types.NamespacedName{Name: *storageClassName}, &storageClass) + err := k8sClient.Get(ctx, types.NamespacedName{Name: storageClassName}, &storageClass) if err != nil { - return "", err + return "", fmt.Errorf("failed to get StorageClass: %s: %w", storageClassName, err) } // Check if the MantleBackup resource being reconciled is managed by the CephCluster we are in charge of. if !strings.HasSuffix(storageClass.Provisioner, ".rbd.csi.ceph.com") { - logger.Info("SC is not managed by RBD", "namespace", pvc.Namespace, "pvc", pvc.Name, "storageClassName", *storageClassName) - return "", nil + return "", fmt.Errorf("SC is not managed by RBD: %s: %w", storageClassName, errEmptyClusterID) } clusterID, ok := storageClass.Parameters["clusterID"] if !ok { - logger.Info("clusterID not found", "namespace", pvc.Namespace, "pvc", pvc.Name, "storageClassName", *storageClassName) + return "", fmt.Errorf("clusterID not found: %s: %w", storageClassName, errEmptyClusterID) + } + + return clusterID, nil +} + +func getCephClusterIDFromPVC(ctx context.Context, k8sClient client.Client, pvc *corev1.PersistentVolumeClaim) (string, error) { + logger := log.FromContext(ctx) + + storageClassName := pvc.Spec.StorageClassName + if storageClassName == nil { + logger.Info("not managed storage class", "namespace", pvc.Namespace, "pvc", pvc.Name) return "", nil } + clusterID, err := getCephClusterIDFromSCName(ctx, k8sClient, *storageClassName) + if err != nil { + logger.Info("failed to get ceph cluster ID from StorageClass name", + "error", err, "namespace", pvc.Namespace, "pvc", pvc.Name, "storageClassName", *storageClassName) + if errors.Is(err, errEmptyClusterID) { + return "", nil + } + return "", err + } + return clusterID, nil } diff --git a/test/e2e/singlek8s/multi_rook_ceph_test.go b/test/e2e/singlek8s/multi_rook_ceph_test.go index e0ce62ed..906f77f8 100644 --- a/test/e2e/singlek8s/multi_rook_ceph_test.go +++ b/test/e2e/singlek8s/multi_rook_ceph_test.go @@ -181,14 +181,16 @@ func (test *multiRookCephTest) testMain() { }) It("check the rbd images in the both clusters", func() { - imageAndSnaps1, err := getImageAndSnapNames(cephCluster1Namespace, test.poolName) - Expect(err).NotTo(HaveOccurred()) - // after delete backup and restore, only the source image should be left - Expect(imageAndSnaps1).To(ConsistOf(srcImageName1)) - - imageAndSnaps2, err := getImageAndSnapNames(cephCluster2Namespace, test.poolName) - Expect(err).NotTo(HaveOccurred()) - Expect(imageAndSnaps2).To(ConsistOf(append(expectImageAndSnaps1, expectImageAndSnaps2...))) + Eventually(func(g Gomega) { + imageAndSnaps1, err := getImageAndSnapNames(cephCluster1Namespace, test.poolName) + g.Expect(err).NotTo(HaveOccurred()) + // after delete backup and restore, only the source image should be left + g.Expect(imageAndSnaps1).To(ConsistOf(srcImageName1)) + + imageAndSnaps2, err := getImageAndSnapNames(cephCluster2Namespace, test.poolName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(imageAndSnaps2).To(ConsistOf(append(expectImageAndSnaps1, expectImageAndSnaps2...))) + }).Should(Succeed()) }) It("delete the resources in the tenant namespace 2", func() { @@ -196,13 +198,15 @@ func (test *multiRookCephTest) testMain() { }) It("check the rbd images in the both clusters", func() { - imageAndSnaps1, err := getImageAndSnapNames(cephCluster1Namespace, test.poolName) - Expect(err).NotTo(HaveOccurred()) - Expect(imageAndSnaps1).To(ConsistOf(srcImageName1)) - - imageAndSnaps2, err := getImageAndSnapNames(cephCluster2Namespace, test.poolName) - Expect(err).NotTo(HaveOccurred()) - Expect(imageAndSnaps2).To(ConsistOf(append(expectImageAndSnaps1, srcImageName2))) + Eventually(func(g Gomega) { + imageAndSnaps1, err := getImageAndSnapNames(cephCluster1Namespace, test.poolName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(imageAndSnaps1).To(ConsistOf(srcImageName1)) + + imageAndSnaps2, err := getImageAndSnapNames(cephCluster2Namespace, test.poolName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(imageAndSnaps2).To(ConsistOf(append(expectImageAndSnaps1, srcImageName2))) + }).Should(Succeed()) }) It("check there is no change in the dummy rbd image", func() { diff --git a/test/e2e/singlek8s/restore_test.go b/test/e2e/singlek8s/restore_test.go index 813be194..bc1e9f26 100644 --- a/test/e2e/singlek8s/restore_test.go +++ b/test/e2e/singlek8s/restore_test.go @@ -253,27 +253,30 @@ func (test *restoreTest) testCleanup() { _, _, err = kubectl("delete", "mantlerestore", "-n", test.tenantNamespace, test.mantleRestoreName1) Expect(err).NotTo(HaveOccurred()) - By("checking if the PVC is deleted") - _, _, err = kubectl("get", "pvc", "-n", test.tenantNamespace, test.mantleRestoreName1) + By("checking if the MantleRestore is deleted") + _, _, err = kubectl("get", "mantlerestore", "-n", test.tenantNamespace, test.mantleRestoreName1) Expect(err).To(HaveOccurred()) - By("checking if the PV is deleted") - _, _, err = kubectl("get", "pv", fmt.Sprintf("mr-%s-%s", test.tenantNamespace, test.mantleRestoreName1)) - Expect(err).To(HaveOccurred()) + Eventually(func(g Gomega) { + By("checking if the PVC is deleted") + _, _, err = kubectl("get", "pvc", "-n", test.tenantNamespace, test.mantleRestoreName1) + g.Expect(err).To(HaveOccurred()) - By("checking if the clone image is deleted") - _, err = getRBDInfo(cephCluster1Namespace, test.poolName, imageName) - Expect(err).To(HaveOccurred()) + By("checking if the PV is deleted") + _, _, err = kubectl("get", "pv", fmt.Sprintf("mr-%s-%s", test.tenantNamespace, test.mantleRestoreName1)) + g.Expect(err).To(HaveOccurred()) - By("checking if the MantleRestore is deleted") - _, _, err = kubectl("get", "mantlerestore", "-n", test.tenantNamespace, test.mantleRestoreName1) - Expect(err).To(HaveOccurred()) + By("checking if the clone image is deleted") + _, err = getRBDInfo(cephCluster1Namespace, test.poolName, imageName) + g.Expect(err).To(HaveOccurred()) + }).Should(Succeed()) }) - It("should wait while the PV/PVC is in used", func() { + It("should NOT delete an RBD image while its corresponding PV and PVC are in use", func() { test.cleanup() imageName := fmt.Sprintf("mantle-%s-%s", test.tenantNamespace, test.mantleRestoreName1) podName := util.GetUniqueName("pod-") + pvName := fmt.Sprintf("mr-%s-%s", test.tenantNamespace, test.mantleRestoreName1) err := applyPVCTemplate(test.tenantNamespace, test.pvcName, test.storageClassName) Expect(err).NotTo(HaveOccurred()) @@ -292,18 +295,19 @@ func (test *restoreTest) testCleanup() { _, _, err = kubectl("wait", "--for=condition=Ready", "pod", podName, "-n", test.tenantNamespace, "--timeout=1m") Expect(err).NotTo(HaveOccurred()) - By("deleting the MantleRestore background") - go func() { - _, _, err = kubectl("delete", "mantlerestore", "-n", test.tenantNamespace, test.mantleRestoreName1) - Expect(err).NotTo(HaveOccurred()) - }() - - By("checking the resources are exist while the PVC is in used") - Consistently(func() error { + By("deleting the MantleRestore") + _, _, err = kubectl("delete", "mantlerestore", "-n", test.tenantNamespace, test.mantleRestoreName1) + Expect(err).NotTo(HaveOccurred()) + By("checking the MantleRestore is deleted") + Eventually(func(g Gomega) { _, _, err = kubectl("get", "mantlerestore", "-n", test.tenantNamespace, test.mantleRestoreName1) - if err != nil { - return err - } + g.Expect(err).To(HaveOccurred()) + }).Should(Succeed()) + + By("checking the PV and RBD image exists while the PVC is in use") + Consistently(func(g Gomega) error { + _, _, err := kubectl("get", "pv", pvName) + g.Expect(err).NotTo(HaveOccurred()) _, err = getRBDInfo(cephCluster1Namespace, test.poolName, imageName) return err @@ -313,66 +317,14 @@ func (test *restoreTest) testCleanup() { _, _, err = kubectl("delete", "pod", "-n", test.tenantNamespace, podName) Expect(err).NotTo(HaveOccurred()) - By("checking the resources are deleted after the PVC is released") - Eventually(func() error { - _, _, err = kubectl("get", "mantlerestore", "-n", test.tenantNamespace, test.mantleRestoreName1) - return err - }).Should(HaveOccurred()) - }) - - It("should fail to delete the MantleRestore if failed to remove rbd image", func() { - imageName := fmt.Sprintf("mantle-%s-%s", test.tenantNamespace, test.mantleRestoreName1) - - test.cleanup() - - err := applyPVCTemplate(test.tenantNamespace, test.pvcName, test.storageClassName) - Expect(err).NotTo(HaveOccurred()) - err = applyMantleBackupTemplate(test.tenantNamespace, test.pvcName, test.mantleBackupName1) - Expect(err).NotTo(HaveOccurred()) - err = applyMantleRestoreTemplate(test.tenantNamespace, test.mantleRestoreName1, test.mantleBackupName1) - Expect(err).NotTo(HaveOccurred()) - - Eventually(func() bool { - return isMantleRestoreReady(test.tenantNamespace, test.mantleRestoreName1) - }).Should(BeTrue()) - - // wait for the reconcile to finish - time.Sleep(10 * time.Second) - - By("edit the pool name to a non-exist pool") - _, _, err = kubectl("patch", "mantlerestore", "-n", test.tenantNamespace, test.mantleRestoreName1, - "--subresource", "status", "--type", "json", - "-p", `[{"op": "replace", "path": "/status/pool", "value": "non-exist-pool"}]`) - Expect(err).NotTo(HaveOccurred()) - - By("deleting the first MantleRestore") - go func() { - _, _, err = kubectl("delete", "mantlerestore", "-n", test.tenantNamespace, test.mantleRestoreName1) - Expect(err).NotTo(HaveOccurred()) - }() - - By("checking the MantleRestore is not deleted") - Consistently(func() error { - _, _, err = kubectl("get", "mantlerestore", "-n", test.tenantNamespace, test.mantleRestoreName1) - if err != nil { - return err - } + By("checking the PV and RBD image is deleted") + Eventually(func(g Gomega) { + _, _, err := kubectl("get", "pv", pvName) + g.Expect(err).To(HaveOccurred()) _, err = getRBDInfo(cephCluster1Namespace, test.poolName, imageName) - return err - }, 30*time.Second).ShouldNot(HaveOccurred()) - - By("revert pool name to original") - _, _, err = kubectl("patch", "mantlerestore", "-n", test.tenantNamespace, test.mantleRestoreName1, - "--subresource", "status", "--type", "json", "-p", - fmt.Sprintf(`[{"op": "replace", "path": "/status/pool", "value": "%s"}]`, test.poolName)) - Expect(err).NotTo(HaveOccurred()) - - By("checking the MantleRestore is deleted") - Eventually(func() error { - _, _, err = kubectl("get", "mantlerestore", "-n", test.tenantNamespace, test.mantleRestoreName1) - return err - }).Should(HaveOccurred()) + g.Expect(err).To(HaveOccurred()) + }).Should(Succeed()) }) } @@ -432,8 +384,10 @@ func (test *restoreTest) testRestoreWithMultipleBackups() { Expect(err).NotTo(HaveOccurred()) By("checking the second restore is deleted") - _, err = getRBDInfo(cephCluster1Namespace, test.poolName, imageName2) - Expect(err).To(HaveOccurred()) + Eventually(func(g Gomega) { + _, err = getRBDInfo(cephCluster1Namespace, test.poolName, imageName2) + g.Expect(err).To(HaveOccurred()) + }).Should(Succeed()) By("checking the first restore PV/PVC and image should still exist") _, _, err = kubectl("get", "pv", fmt.Sprintf("mr-%s-%s", test.tenantNamespace, test.mantleRestoreName1)) @@ -555,18 +509,17 @@ func (test *restoreTest) testCloneImageFromBackup() { func (test *restoreTest) testRemoveImage() { cloneImageName := fmt.Sprintf("mantle-%s-%s", test.tenantNamespace, test.mantleRestoreName1) - reconciler := controller.NewMantleRestoreReconcilerE2E(cephCluster1Namespace, cephCluster1Namespace) - restore := &mantlev1.MantleRestore{ - ObjectMeta: metav1.ObjectMeta{ - Name: test.mantleRestoreName1, - Namespace: test.tenantNamespace, - }, - Spec: mantlev1.MantleRestoreSpec{ - Backup: test.mantleBackupName1, - }, - Status: mantlev1.MantleRestoreStatus{ - ClusterID: cephCluster1Namespace, - Pool: test.poolName, + pvReconciler := controller.NewPersistentVolumeReconcilerE2E(cephCluster1Namespace) + pv := &corev1.PersistentVolume{ + Spec: corev1.PersistentVolumeSpec{ + PersistentVolumeSource: corev1.PersistentVolumeSource{ + CSI: &corev1.CSIPersistentVolumeSource{ + VolumeHandle: cloneImageName, + VolumeAttributes: map[string]string{ + "pool": test.poolName, + }, + }, + }, }, } @@ -575,7 +528,7 @@ func (test *restoreTest) testRemoveImage() { _, err := getRBDInfo(cephCluster1Namespace, test.poolName, cloneImageName) Expect(err).NotTo(HaveOccurred()) - err = reconciler.RemoveRBDImage(ctx, restore) + err = pvReconciler.RemoveRBDImage(ctx, pv) Expect(err).NotTo(HaveOccurred()) // should get an error since the image is removed @@ -584,7 +537,7 @@ func (test *restoreTest) testRemoveImage() { }) It("should skip removing the image if it does not exist", func(ctx SpecContext) { - err := reconciler.RemoveRBDImage(ctx, restore) + err := pvReconciler.RemoveRBDImage(ctx, pv) Expect(err).NotTo(HaveOccurred()) }) }) diff --git a/test/e2e/singlek8s/util.go b/test/e2e/singlek8s/util.go index 46105211..62c862cd 100644 --- a/test/e2e/singlek8s/util.go +++ b/test/e2e/singlek8s/util.go @@ -423,7 +423,7 @@ func writeTestData(namespace, pvc string, data []byte) error { return err } - _, _, err := kubectl("wait", "--for=condition=Ready", "pod", podName, "-n", namespace, "--timeout=1m") + _, _, err := kubectl("wait", "--for=condition=Ready", "pod", podName, "-n", namespace, "--timeout=5m") if err != nil { return fmt.Errorf("kubectl wait pod failed. err: %w", err) } diff --git a/test/e2e/testdata/values-mantle-primary-template.yaml b/test/e2e/testdata/values-mantle-primary-template.yaml index 838aa7b5..cbcee13f 100644 --- a/test/e2e/testdata/values-mantle-primary-template.yaml +++ b/test/e2e/testdata/values-mantle-primary-template.yaml @@ -5,3 +5,4 @@ controller: objectStorageEndpoint: {OBJECT_STORAGE_ENDPOINT} envSecret: export-data exportDataStorageClass: rook-ceph-block + gcInterval: 1s diff --git a/test/e2e/testdata/values-mantle-secondary-template.yaml b/test/e2e/testdata/values-mantle-secondary-template.yaml index 560d8a6c..cc2c857d 100644 --- a/test/e2e/testdata/values-mantle-secondary-template.yaml +++ b/test/e2e/testdata/values-mantle-secondary-template.yaml @@ -6,6 +6,7 @@ controller: objectStorageBucketName: {OBJECT_STORAGE_BUCKET_NAME} objectStorageEndpoint: {OBJECT_STORAGE_ENDPOINT} envSecret: export-data + gcInterval: 1s secondaryService: type: NodePort diff --git a/test/e2e/testdata/values-mantle1.yaml b/test/e2e/testdata/values-mantle1.yaml index 8ab3f446..97bc36de 100644 --- a/test/e2e/testdata/values-mantle1.yaml +++ b/test/e2e/testdata/values-mantle1.yaml @@ -1,2 +1,3 @@ controller: overwriteMBCSchedule: "* * * * *" + gcInterval: 1s diff --git a/test/e2e/testdata/values-mantle2.yaml b/test/e2e/testdata/values-mantle2.yaml index e69de29b..755d6e90 100644 --- a/test/e2e/testdata/values-mantle2.yaml +++ b/test/e2e/testdata/values-mantle2.yaml @@ -0,0 +1,2 @@ +controller: + gcInterval: 1s