Skip to content

Commit

Permalink
Merge pull request #71 from cybozu-go/avoid-using-reader
Browse files Browse the repository at this point in the history
stop using reader in MantleRestoreReconciler
  • Loading branch information
satoru-takeuchi authored Dec 4, 2024
2 parents 3121dde + 132a4d0 commit 0ccc41e
Show file tree
Hide file tree
Showing 21 changed files with 768 additions and 300 deletions.
4 changes: 4 additions & 0 deletions PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ resources:
kind: MantleBackupConfig
path: github.com/cybozu-go/mantle/api/v1
version: v1
- controller: true
domain: cybozu.io
kind: PersistentVolume
version: v1
version: "3"
14 changes: 14 additions & 0 deletions charts/mantle-cluster-wide/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- persistentvolumes/finalizers
verbs:
- update
- apiGroups:
- ""
resources:
- persistentvolumes/status
verbs:
- get
- patch
- update
- apiGroups:
- batch
resources:
Expand Down
3 changes: 3 additions & 0 deletions charts/mantle/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ spec:
{{- with .Values.controller.exportDataStorageClass }}
- --export-data-storage-class={{ . }}
{{- end }}
{{- with .Values.controller.gcInterval }}
- --gc-interval={{ . }}
{{- end }}
env:
- name: POD_NAME
valueFrom:
Expand Down
32 changes: 31 additions & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (
objectStorageEndpoint string
caCertConfigMapSrc string
caCertKeySrc string
gcInterval string

scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
Expand Down Expand Up @@ -95,6 +96,8 @@ func init() {
flags.StringVar(&caCertKeySrc, "ca-cert-key", "ca.crt",
"The key of the ConfigMap specified by --ca-cert-config-map that contains the intermediate certificate. "+
"The default value is ca.crt. This option is just ignored if --ca-cert-configmap isn't specified.")
flags.StringVar(&gcInterval, "gc-interval", "1h",
"The time period between each garbage collection for orphaned resources.")

goflags := flag.NewFlagSet("goflags", flag.ExitOnError)
zapOpts.Development = true
Expand Down Expand Up @@ -151,6 +154,17 @@ func setupReconcilers(mgr manager.Manager, primarySettings *controller.PrimarySe
caCertConfigMap = &caCertConfigMapSrc
}

parsedGCInterval, err := time.ParseDuration(gcInterval)
if err != nil {
setupLog.Error(err, "faield to parse gc interval", "gcInterval", gcInterval)
return err
}
if parsedGCInterval < 1*time.Second {
err := fmt.Errorf("the specified gc interval is too short: %s", parsedGCInterval.String())
setupLog.Error(err, "failed to validate gc interval", "gcInterval", gcInterval)
return err
}

backupReconciler := controller.NewMantleBackupReconciler(
mgr.GetClient(),
mgr.GetScheme(),
Expand All @@ -173,7 +187,6 @@ func setupReconcilers(mgr manager.Manager, primarySettings *controller.PrimarySe

restoreReconciler := controller.NewMantleRestoreReconciler(
mgr.GetClient(),
mgr.GetAPIReader(),
mgr.GetScheme(),
managedCephClusterID,
role,
Expand All @@ -193,8 +206,25 @@ func setupReconcilers(mgr manager.Manager, primarySettings *controller.PrimarySe
setupLog.Error(err, "unable to create controller", "controller", "MantleBackupConfig")
return err
}

if err := controller.NewPersistentVolumeReconciler(
mgr.GetClient(),
mgr.GetScheme(),
managedCephClusterID,
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PersistentVolumeReconciler")
return err
}

//+kubebuilder:scaffold:builder

if err := mgr.Add(
controller.NewGarbageCollectorRunner(mgr.GetClient(), parsedGCInterval, managedCephClusterID),
); err != nil {
setupLog.Error(err, "unable to create runner", "runner", "GarbageCollectorRunner")
return err
}

return nil
}

Expand Down
14 changes: 14 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- persistentvolumes/finalizers
verbs:
- update
- apiGroups:
- ""
resources:
- persistentvolumes/status
verbs:
- get
- patch
- update
- apiGroups:
- batch
resources:
Expand Down
135 changes: 135 additions & 0 deletions internal/controller/garbage_collector_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package controller

import (
"context"
"fmt"
"time"

mantlev1 "github.com/cybozu-go/mantle/api/v1"
corev1 "k8s.io/api/core/v1"
aerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type GarbageCollectorRunner struct {
client client.Client
interval time.Duration
managedCephClusterID string
}

func NewGarbageCollectorRunner(
client client.Client,
interval time.Duration,
managedCephClusterID string,
) *GarbageCollectorRunner {
return &GarbageCollectorRunner{
client: client,
interval: interval,
managedCephClusterID: managedCephClusterID,
}
}

func (r *GarbageCollectorRunner) Start(ctx context.Context) error {
logger := log.FromContext(ctx)

for {
ctxSleep, cancelSleep := context.WithTimeout(ctx, r.interval)
<-ctxSleep.Done()
cancelSleep()
if ctx.Err() != nil {
break
}

logger.Info("garbage collection started")

if err := r.deleteOrphanedPVs(ctx); err != nil {
logger.Error(err, "failed to delete orphaned PVs", "error", err)
}

logger.Info("garbage collection finished")
}

return nil
}

func (r *GarbageCollectorRunner) deleteOrphanedPVs(ctx context.Context) error {
logger := log.FromContext(ctx)

requirement, err := labels.NewRequirement(labelRestoringPVKey, selection.Exists, []string{})
if err != nil {
return fmt.Errorf("failed to create a new labels requirement: %w", err)
}
selector := labels.ValidatedSetSelector{}.Add(*requirement)

var pvList corev1.PersistentVolumeList
if err := r.client.List(ctx, &pvList, &client.ListOptions{
LabelSelector: selector,
}); err != nil {
return fmt.Errorf("failed to list PVs: %w", err)
}

for _, pv := range pvList.Items {
shouldDelete, err := r.isMantleRestoreAlreadyDeleted(ctx, &pv)
if err != nil {
return fmt.Errorf("failed to check if a PV should be deleted: %w", err)
}
if !shouldDelete {
continue
}
if err := r.client.Delete(ctx, &pv, &client.DeleteOptions{
Preconditions: &metav1.Preconditions{UID: &pv.ObjectMeta.UID, ResourceVersion: &pv.ObjectMeta.ResourceVersion},
}); err != nil {
return fmt.Errorf("failed to delete PV: %w", err)
}
logger.Info("an orphaned PV is removed", "name", pv.GetName())
}

return nil
}

func (r *GarbageCollectorRunner) isMantleRestoreAlreadyDeleted(ctx context.Context, pv *corev1.PersistentVolume) (bool, error) {
restoreUID, ok := pv.GetAnnotations()[PVAnnotationRestoredBy]
if !ok {
return false, fmt.Errorf("failed to find annotation: %s: %s", PVAnnotationRestoredBy, pv.GetName())
}
restoreName, ok := pv.GetAnnotations()[PVAnnotationRestoredByName]
if !ok {
return false, fmt.Errorf("failed to find annotation: %s: %s", PVAnnotationRestoredByName, pv.GetName())
}
restoreNamespace, ok := pv.GetAnnotations()[PVAnnotationRestoredByNamespace]
if !ok {
return false, fmt.Errorf("failed to find annotation: %s: %s", PVAnnotationRestoredByNamespace, pv.GetName())
}

clusterID, ok := pv.Spec.CSI.VolumeAttributes["clusterID"]
if !ok {
return false, fmt.Errorf("failed to find cluster ID: %s", pv.GetName())
}
if r.managedCephClusterID != clusterID {
return false, nil
}

var restore mantlev1.MantleRestore
if err := r.client.Get(
ctx,
types.NamespacedName{Name: restoreName, Namespace: restoreNamespace},
&restore,
); err != nil {
if aerrors.IsNotFound(err) {
return true, nil
} else {
return false, fmt.Errorf("failed to get MantleRestore: %s: %w", pv.GetName(), err)
}
}

if string(restore.GetUID()) != restoreUID {
return true, nil
}

return false, nil
}
Loading

0 comments on commit 0ccc41e

Please sign in to comment.