diff --git a/charts/mantle-cluster-wide/templates/clusterrole.yaml b/charts/mantle-cluster-wide/templates/clusterrole.yaml
index 58179c8a..b11e0862 100644
--- a/charts/mantle-cluster-wide/templates/clusterrole.yaml
+++ b/charts/mantle-cluster-wide/templates/clusterrole.yaml
@@ -31,6 +31,7 @@ rules:
- batch
resources:
- cronjobs
+ - jobs
verbs:
- create
- delete
diff --git a/charts/mantle/templates/deployment.yaml b/charts/mantle/templates/deployment.yaml
index ef7e0922..a9b7664d 100644
--- a/charts/mantle/templates/deployment.yaml
+++ b/charts/mantle/templates/deployment.yaml
@@ -67,6 +67,12 @@ spec:
{{- with .Values.controller.overwriteMBCSchedule }}
- --overwrite-mbc-schedule={{ . }}
{{- end }}
+ {{- with .Values.controller.objectStorageBucketName }}
+ - --object-storage-bucket-name={{ . }}
+ {{- end }}
+ {{- with .Values.controller.objectStorageEndpoint}}
+ - --object-storage-endpoint={{ . }}
+ {{- end }}
env:
- name: POD_NAME
valueFrom:
@@ -76,6 +82,8 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
+ - name: POD_IMAGE
+ value: {{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}
ports:
{{- toYaml .Values.controller.ports | nindent 12 }}
- command:
diff --git a/cmd/controller/main.go b/cmd/controller/main.go
index 3f2b5742..572631cc 100644
--- a/cmd/controller/main.go
+++ b/cmd/controller/main.go
@@ -42,13 +42,20 @@ var ControllerCmd = &cobra.Command{
}
var (
- metricsAddr string
- enableLeaderElection bool
- probeAddr string
- zapOpts zap.Options
- overwriteMBCSchedule string
- role string
- mantleServiceEndpoint string
+ metricsAddr string
+ enableLeaderElection bool
+ probeAddr string
+ zapOpts zap.Options
+ overwriteMBCSchedule string
+ role string
+ mantleServiceEndpoint string
+ maxExportJobs int
+ exportDataStorageClass string
+ envSecret string
+ objectStorageBucketName string
+ objectStorageEndpoint string
+ caCertConfigMapSrc string
+ caCertKeySrc string
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
@@ -73,6 +80,21 @@ func init() {
"(i) If --role is 'standalone', this option is ignored. (ii) If --role is 'primary', this option is required "+
"and is interpreted as the address that the primary mantle should connect to. (iii) If --role is 'secondary', "+
"this option is required and is interpreted as the address that the secondary mantle should listen to.")
+ flags.IntVar(&maxExportJobs, "max-export-jobs", 8,
+ "The maximum number of export jobs that can run simultaneously. If you set this to 0, there is no limit.")
+ flags.StringVar(&exportDataStorageClass, "export-data-storage-class", "",
+ "The storage class of PVCs used to store exported data temporarily.")
+ flags.StringVar(&envSecret, "env-secret", "",
+ "The name of the Secret resource that contains environment variables related to the controller and Jobs.")
+ flags.StringVar(&objectStorageBucketName, "object-storage-bucket-name", "",
+ "The bucket name of the object storage which should be used to store backups.")
+ flags.StringVar(&objectStorageEndpoint, "object-storage-endpoint", "",
+ "The endpoint URL to access the object storage.")
+ flags.StringVar(&caCertConfigMapSrc, "ca-cert-configmap", "",
+ "The name of the ConfigMap resource that contains the intermediate certificate used to access the object storage.")
+ flags.StringVar(&caCertKeySrc, "ca-cert-key", "ca.crt",
+ "The key of the ConfigMap specified by --ca-cert-config-map that contains the intermediate certificate. "+
+ "The default value is ca.crt. This option is just ignored if --ca-cert-configmap isn't specified.")
goflags := flag.NewFlagSet("goflags", flag.ExitOnError)
zapOpts.Development = true
@@ -90,12 +112,20 @@ func checkCommandlineArgs() error {
case controller.RoleStandalone:
// nothing to do
case controller.RolePrimary:
- if mantleServiceEndpoint == "" {
- return errors.New("--mantle-service-endpoint must be specified if --role is 'primary'")
- }
+ fallthrough
case controller.RoleSecondary:
if mantleServiceEndpoint == "" {
- return errors.New("--mantle-service-endpoint must be specified if --role is 'secondary'")
+ return errors.New("--mantle-service-endpoint must be specified if --role is 'primary' or 'secondary'")
+ }
+ if caCertConfigMapSrc != "" && caCertKeySrc == "" {
+ return errors.New("--ca-cert-key must be specified if --role is 'primary' or 'secondary', " +
+ "and --ca-cert-configmap is specified")
+ }
+ if objectStorageBucketName == "" {
+ return errors.New("--object-storage-bucket-name must be specified if --role is 'primary' or 'secondary'")
+ }
+ if objectStorageEndpoint == "" {
+ return errors.New("--object-storage-endpoint must be specified if --role is 'primary' or 'secondary'")
}
default:
return fmt.Errorf("role should be one of 'standalone', 'primary', or 'secondary': %s", role)
@@ -110,12 +140,31 @@ func setupReconcilers(mgr manager.Manager, primarySettings *controller.PrimarySe
return errors.New("POD_NAMESPACE is empty")
}
+ podImage := os.Getenv("POD_IMAGE")
+ if podImage == "" {
+ setupLog.Error(errors.New("POD_IMAGE must not be empty"), "POD_IMAGE must not be empty")
+ return errors.New("POD_IMAGE is empty")
+ }
+
+ var caCertConfigMap *string
+ if caCertConfigMapSrc != "" {
+ caCertConfigMap = &caCertConfigMapSrc
+ }
+
backupReconciler := controller.NewMantleBackupReconciler(
mgr.GetClient(),
mgr.GetScheme(),
managedCephClusterID,
role,
primarySettings,
+ podImage,
+ envSecret,
+ &controller.ObjectStorageSettings{
+ BucketName: objectStorageBucketName,
+ Endpoint: objectStorageEndpoint,
+ CACertConfigMap: caCertConfigMap,
+ CACertKey: &caCertKeySrc,
+ },
)
if err := backupReconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MantleBackup")
@@ -174,9 +223,11 @@ func setupPrimary(ctx context.Context, mgr manager.Manager, wg *sync.WaitGroup)
}()
primarySettings := &controller.PrimarySettings{
- ServiceEndpoint: mantleServiceEndpoint,
- Conn: conn,
- Client: proto.NewMantleServiceClient(conn),
+ ServiceEndpoint: mantleServiceEndpoint,
+ Conn: conn,
+ Client: proto.NewMantleServiceClient(conn),
+ MaxExportJobs: maxExportJobs,
+ ExportDataStorageClass: exportDataStorageClass,
}
return setupReconcilers(mgr, primarySettings)
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index b8656986..072e30a3 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -31,6 +31,7 @@ rules:
- batch
resources:
- cronjobs
+ - jobs
verbs:
- create
- delete
diff --git a/docs/controller-protocol.md b/docs/controller-protocol.md
index 1e71ff18..bbfdcf2a 100644
--- a/docs/controller-protocol.md
+++ b/docs/controller-protocol.md
@@ -10,6 +10,8 @@
- [CreateOrUpdatePVCResponse](#proto-CreateOrUpdatePVCResponse)
- [ListMantleBackupRequest](#proto-ListMantleBackupRequest)
- [ListMantleBackupResponse](#proto-ListMantleBackupResponse)
+ - [SetSynchronizingRequest](#proto-SetSynchronizingRequest)
+ - [SetSynchronizingResponse](#proto-SetSynchronizingResponse)
- [MantleService](#proto-MantleService)
@@ -111,6 +113,33 @@ ListMantleBackupResponse is a response message for ListMantleBackup RPC.
+
+
+
+### SetSynchronizingRequest
+SetSynchronizingRequest is a request message for SetSynchronize RPC.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| name | [string](#string) | | |
+| namespace | [string](#string) | | |
+| diffFrom | [string](#string) | optional | |
+
+
+
+
+
+
+
+
+### SetSynchronizingResponse
+SetSynchronizingResponse is a response message for SetSynchronize RPC.
+
+
+
+
+
@@ -128,6 +157,7 @@ ListMantleBackupResponse is a response message for ListMantleBackup RPC.
| CreateOrUpdatePVC | [CreateOrUpdatePVCRequest](#proto-CreateOrUpdatePVCRequest) | [CreateOrUpdatePVCResponse](#proto-CreateOrUpdatePVCResponse) | |
| CreateOrUpdateMantleBackup | [CreateOrUpdateMantleBackupRequest](#proto-CreateOrUpdateMantleBackupRequest) | [CreateOrUpdateMantleBackupResponse](#proto-CreateOrUpdateMantleBackupResponse) | |
| ListMantleBackup | [ListMantleBackupRequest](#proto-ListMantleBackupRequest) | [ListMantleBackupResponse](#proto-ListMantleBackupResponse) | |
+| SetSynchronizing | [SetSynchronizingRequest](#proto-SetSynchronizingRequest) | [SetSynchronizingResponse](#proto-SetSynchronizingResponse) | |
diff --git a/internal/controller/internal/testutil/resources.go b/internal/controller/internal/testutil/resources.go
index 7636acbb..7a578d6f 100644
--- a/internal/controller/internal/testutil/resources.go
+++ b/internal/controller/internal/testutil/resources.go
@@ -7,6 +7,7 @@ import (
mantlev1 "github.com/cybozu-go/mantle/api/v1"
"github.com/cybozu-go/mantle/test/util"
. "github.com/onsi/gomega"
+ batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
@@ -29,13 +30,26 @@ type ResourceManager struct {
PoolName string
}
-func NewResourceManager(client client.Client) *ResourceManager {
+func NewResourceManager(client client.Client) (*ResourceManager, error) {
+ clusterID := util.GetUniqueName("ceph-")
+
+ // Create a namespace of the same name as cluster ID
+ ns := corev1.Namespace{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: clusterID,
+ },
+ }
+ err := client.Create(context.Background(), &ns)
+ if err != nil {
+ return nil, err
+ }
+
return &ResourceManager{
client: client,
StorageClassName: util.GetUniqueName("sc-"),
- ClusterID: util.GetUniqueName("ceph-"),
+ ClusterID: clusterID,
PoolName: util.GetUniqueName("pool-"),
- }
+ }, nil
}
// EnvTest cannot delete namespace. So, we have to use another new namespace.
@@ -208,6 +222,27 @@ func (r *ResourceManager) WaitForBackupSyncedToRemote(ctx context.Context, backu
}).WithContext(ctx).Should(Succeed())
}
+func (r *ResourceManager) ChangeJobCondition(ctx context.Context, job *batchv1.Job, condType batchv1.JobConditionType, condStatus corev1.ConditionStatus) error {
+ if job.Status.Conditions == nil {
+ job.Status.Conditions = []batchv1.JobCondition{}
+ }
+ updated := false
+ for i := range job.Status.Conditions {
+ if job.Status.Conditions[i].Type == condType {
+ job.Status.Conditions[i].Status = condStatus
+ updated = true
+ break
+ }
+ }
+ if !updated {
+ job.Status.Conditions = append(job.Status.Conditions, batchv1.JobCondition{
+ Type: batchv1.JobComplete,
+ Status: corev1.ConditionTrue,
+ })
+ }
+ return r.client.Status().Update(ctx, job)
+}
+
// cf. https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#pointer-method-example
type ObjectConstraint[T any] interface {
client.Object
diff --git a/internal/controller/mantlebackup_controller.go b/internal/controller/mantlebackup_controller.go
index c5a0e539..e677068e 100644
--- a/internal/controller/mantlebackup_controller.go
+++ b/internal/controller/mantlebackup_controller.go
@@ -10,9 +10,11 @@ import (
mantlev1 "github.com/cybozu-go/mantle/api/v1"
"github.com/cybozu-go/mantle/internal/ceph"
"github.com/cybozu-go/mantle/pkg/controller/proto"
+ batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
aerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@@ -33,6 +35,10 @@ const (
labelLocalBackupTargetPVCUID = "mantle.cybozu.io/local-backup-target-pvc-uid"
labelRemoteBackupTargetPVCUID = "mantle.cybozu.io/remote-backup-target-pvc-uid"
+ labelAppNameValue = "mantle"
+ labelComponentExportData = "export-data"
+ labelComponentExportJob = "export-job"
+ labelComponentUploadJob = "upload-job"
annotRemoteUID = "mantle.cybozu.io/remote-uid"
annotDiffFrom = "mantle.cybozu.io/diff-from"
annotDiffTo = "mantle.cybozu.io/diff-to"
@@ -43,27 +49,49 @@ const (
syncModeIncremental = "incremental"
)
+type ObjectStorageSettings struct {
+ CACertConfigMap *string
+ CACertKey *string
+ BucketName string
+ Endpoint string
+}
+
// MantleBackupReconciler reconciles a MantleBackup object
type MantleBackupReconciler struct {
client.Client
- Scheme *runtime.Scheme
- ceph ceph.CephCmd
- managedCephClusterID string
- role string
- primarySettings *PrimarySettings // This should be non-nil if and only if role equals 'primary'.
- expireQueueCh chan event.GenericEvent
+ Scheme *runtime.Scheme
+ ceph ceph.CephCmd
+ managedCephClusterID string
+ role string
+ primarySettings *PrimarySettings // This should be non-nil if and only if role equals 'primary'.
+ expireQueueCh chan event.GenericEvent
+ podImage string
+ envSecret string
+ objectStorageSettings *ObjectStorageSettings // This should be non-nil if and only if role equals 'primary' or 'secondary'.
}
// NewMantleBackupReconciler returns NodeReconciler.
-func NewMantleBackupReconciler(client client.Client, scheme *runtime.Scheme, managedCephClusterID, role string, primarySettings *PrimarySettings) *MantleBackupReconciler {
+func NewMantleBackupReconciler(
+ client client.Client,
+ scheme *runtime.Scheme,
+ managedCephClusterID,
+ role string,
+ primarySettings *PrimarySettings,
+ podImage string,
+ envSecret string,
+ objectStorageSettings *ObjectStorageSettings,
+) *MantleBackupReconciler {
return &MantleBackupReconciler{
- Client: client,
- Scheme: scheme,
- ceph: ceph.NewCephCmd(),
- managedCephClusterID: managedCephClusterID,
- role: role,
- primarySettings: primarySettings,
- expireQueueCh: make(chan event.GenericEvent),
+ Client: client,
+ Scheme: scheme,
+ ceph: ceph.NewCephCmd(),
+ managedCephClusterID: managedCephClusterID,
+ role: role,
+ primarySettings: primarySettings,
+ expireQueueCh: make(chan event.GenericEvent),
+ podImage: podImage,
+ envSecret: envSecret,
+ objectStorageSettings: objectStorageSettings,
}
}
@@ -273,6 +301,7 @@ func (r *MantleBackupReconciler) expire(ctx context.Context, backup *mantlev1.Ma
//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch
//+kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get;list;watch
//+kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list;watch
+//+kubebuilder:rbac:groups="batch",resources=jobs,verbs=get;list;watch;create;update;patch;delete
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
@@ -464,13 +493,10 @@ func (r *MantleBackupReconciler) replicate(
return ctrl.Result{}, err
}
- // FIXME: Delete this code after implementing export().
- prepareResult.isSecondaryMantleBackupReadyToUse = true
-
if prepareResult.isSecondaryMantleBackupReadyToUse {
return r.primaryCleanup(ctx, backup)
}
- return r.export(ctx, backup, r.primarySettings.Client, prepareResult)
+ return r.export(ctx, backup, prepareResult)
}
func (r *MantleBackupReconciler) replicateManifests(
@@ -848,17 +874,558 @@ func searchForDiffOriginMantleBackup(
}
func (r *MantleBackupReconciler) export(
- _ context.Context,
- _ *mantlev1.MantleBackup,
- _ proto.MantleServiceClient,
+ ctx context.Context,
+ targetBackup *mantlev1.MantleBackup,
prepareResult *dataSyncPrepareResult,
-) (ctrl.Result, error) { //nolint:unparam
+) (ctrl.Result, error) {
+ sourceBackup := prepareResult.diffFrom
+ var sourceBackupName *string
+ if sourceBackup != nil {
+ s := sourceBackup.GetName()
+ sourceBackupName = &s
+ }
+
+ if err := r.annotateExportTargetMantleBackup(
+ ctx, targetBackup, prepareResult.isIncremental, sourceBackupName,
+ ); err != nil {
+ return ctrl.Result{}, err
+ }
+
if prepareResult.isIncremental {
- return ctrl.Result{}, fmt.Errorf("incremental backup is not implemented")
+ if err := r.annotateExportSourceMantleBackup(ctx, sourceBackup, targetBackup); err != nil {
+ return ctrl.Result{}, err
+ }
+ }
+
+ if _, err := r.primarySettings.Client.SetSynchronizing(
+ ctx,
+ &proto.SetSynchronizingRequest{
+ Name: targetBackup.GetName(),
+ Namespace: targetBackup.GetNamespace(),
+ DiffFrom: sourceBackupName,
+ },
+ ); err != nil {
+ return ctrl.Result{}, err
+ }
+
+ if result, err := r.checkIfNewJobCanBeCreated(ctx); err != nil || !result.IsZero() {
+ return result, err
+ }
+
+ if err := r.createOrUpdateExportDataPVC(ctx, targetBackup); err != nil {
+ return ctrl.Result{}, err
+ }
+
+ if err := r.createOrUpdateExportJob(ctx, targetBackup, sourceBackupName); err != nil {
+ return ctrl.Result{}, err
+ }
+
+ // Update the status of the MantleBackup.
+ // FIXME: this is inserted only for tests and must be removed after implementing the feature to access the object storage.
+ if err := r.updateStatusCondition(ctx, targetBackup, metav1.Condition{
+ Type: mantlev1.BackupConditionSyncedToRemote,
+ Status: metav1.ConditionTrue,
+ Reason: mantlev1.BackupReasonNone,
+ }); err != nil {
+ return ctrl.Result{}, err
}
+
+ if result, err := r.checkIfExportJobIsCompleted(ctx, targetBackup); err != nil || !result.IsZero() {
+ return result, err
+ }
+
+ if err := r.createOrUpdateExportDataUploadJob(ctx, targetBackup); err != nil {
+ return ctrl.Result{}, err
+ }
+
+ return ctrl.Result{Requeue: true}, nil
+}
+
+func (r *MantleBackupReconciler) annotateExportTargetMantleBackup(
+ ctx context.Context,
+ target *mantlev1.MantleBackup,
+ incremental bool,
+ sourceName *string,
+) error {
+ _, err := ctrl.CreateOrUpdate(ctx, r.Client, target, func() error {
+ annot := target.GetAnnotations()
+ if annot == nil {
+ annot = map[string]string{}
+ }
+ if incremental {
+ annot[annotSyncMode] = syncModeIncremental
+ annot[annotDiffFrom] = *sourceName
+ } else {
+ annot[annotSyncMode] = syncModeFull
+ }
+ target.SetAnnotations(annot)
+ return nil
+ })
+ return err
+}
+
+func (r *MantleBackupReconciler) annotateExportSourceMantleBackup(
+ ctx context.Context,
+ source *mantlev1.MantleBackup,
+ target *mantlev1.MantleBackup,
+) error {
+ _, err := ctrl.CreateOrUpdate(ctx, r.Client, source, func() error {
+ annot := source.GetAnnotations()
+ if annot == nil {
+ annot = map[string]string{}
+ }
+ annot[annotDiffTo] = target.GetName()
+ source.SetAnnotations(annot)
+ return nil
+ })
+ return err
+}
+
+func (r *MantleBackupReconciler) checkIfNewJobCanBeCreated(ctx context.Context) (ctrl.Result, error) {
+ if r.primarySettings.MaxExportJobs == 0 {
+ return ctrl.Result{}, nil
+ }
+
+ var jobs batchv1.JobList
+ if err := r.Client.List(ctx, &jobs, &client.ListOptions{
+ Namespace: r.managedCephClusterID,
+ LabelSelector: labels.SelectorFromSet(map[string]string{
+ "app.kubernetes.io/name": labelAppNameValue,
+ "app.kubernetes.io/component": labelComponentExportJob,
+ }),
+ }); err != nil {
+ return ctrl.Result{}, err
+ }
+
+ if len(jobs.Items) >= r.primarySettings.MaxExportJobs {
+ return ctrl.Result{Requeue: true}, nil
+ }
+
return ctrl.Result{}, nil
}
+func (r *MantleBackupReconciler) createOrUpdateExportDataPVC(ctx context.Context, target *mantlev1.MantleBackup) error {
+ var targetPVC corev1.PersistentVolumeClaim
+ if err := json.Unmarshal([]byte(target.Status.PVCManifest), &targetPVC); err != nil {
+ return err
+ }
+
+ pvcSize := targetPVC.Spec.Resources.Requests[corev1.ResourceStorage].DeepCopy()
+ // We assume that any diff data will not exceed twice the size of the target PVC.
+ pvcSize.Mul(2)
+
+ var pvc corev1.PersistentVolumeClaim
+ pvc.SetName(makeExportDataPVCName(target))
+ pvc.SetNamespace(r.managedCephClusterID)
+ _, err := ctrl.CreateOrUpdate(ctx, r.Client, &pvc, func() error {
+ labels := pvc.GetLabels()
+ if labels == nil {
+ labels = make(map[string]string)
+ }
+ labels["app.kubernetes.io/name"] = labelAppNameValue
+ labels["app.kubernetes.io/component"] = labelComponentExportData
+ pvc.SetLabels(labels)
+
+ if pvc.Spec.Resources.Requests == nil {
+ pvc.Spec.Resources.Requests = map[corev1.ResourceName]resource.Quantity{}
+ }
+ pvc.Spec.Resources.Requests[corev1.ResourceStorage] = pvcSize
+
+ pvc.Spec.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
+ pvc.Spec.StorageClassName = &r.primarySettings.ExportDataStorageClass
+
+ return nil
+ })
+
+ return err
+}
+
+func makeExportJobName(target *mantlev1.MantleBackup) string {
+ return fmt.Sprintf("mantle-export-%s", target.GetUID())
+}
+
+func makeUploadJobName(target *mantlev1.MantleBackup) string {
+ return fmt.Sprintf("mantle-upload-%s", target.GetUID())
+}
+
+func makeExportDataPVCName(target *mantlev1.MantleBackup) string {
+ return fmt.Sprintf("mantle-export-%s", target.GetUID())
+}
+
+func (r *MantleBackupReconciler) createOrUpdateExportJob(ctx context.Context, target *mantlev1.MantleBackup, sourceBackupNamePtr *string) error {
+ sourceBackupName := ""
+ if sourceBackupNamePtr != nil {
+ sourceBackupName = *sourceBackupNamePtr
+ }
+
+ var pv corev1.PersistentVolume
+ if err := json.Unmarshal([]byte(target.Status.PVManifest), &pv); err != nil {
+ return err
+ }
+
+ var job batchv1.Job
+ job.SetName(makeExportJobName(target))
+ job.SetNamespace(r.managedCephClusterID)
+ if _, err := ctrl.CreateOrUpdate(ctx, r.Client, &job, func() error {
+ labels := job.GetLabels()
+ if labels == nil {
+ labels = map[string]string{}
+ }
+ labels["app.kubernetes.io/name"] = labelAppNameValue
+ labels["app.kubernetes.io/component"] = labelComponentExportJob
+ job.SetLabels(labels)
+
+ var backoffLimit int32 = 65535
+ job.Spec.BackoffLimit = &backoffLimit
+
+ var fsGroup int64 = 10000
+ var runAsGroup int64 = 10000
+ runAsNonRoot := true
+ var runAsUser int64 = 10000
+ job.Spec.Template.Spec.SecurityContext = &corev1.PodSecurityContext{
+ FSGroup: &fsGroup,
+ RunAsGroup: &runAsGroup,
+ RunAsNonRoot: &runAsNonRoot,
+ RunAsUser: &runAsUser,
+ }
+
+ job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
+
+ job.Spec.Template.Spec.Containers = []corev1.Container{
+ {
+ Name: "export",
+ Command: []string{
+ "/bin/bash",
+ "-c",
+ `
+# This shell script is forked from:
+#
+# https://github.com/rook/rook/blob/fb02f500be4e0b80478366e973abf4e6870693a9/images/ceph/toolbox.sh
+#
+# It is distributed under Apache-2.0 license:
+#
+# Copyright 2016 The Rook Authors. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e
+set -o pipefail
+
+CEPH_CONFIG="/etc/ceph/ceph.conf"
+MON_CONFIG="/etc/rook/mon-endpoints"
+KEYRING_FILE="/etc/ceph/keyring"
+# create a ceph config file in its default location so ceph/rados tools can be used
+# without specifying any arguments
+write_endpoints() {
+ endpoints=$(cat ${MON_CONFIG})
+ # filter out the mon names
+ # external cluster can have numbers or hyphens in mon names, handling them in regex
+ # shellcheck disable=SC2001
+ mon_endpoints=$(echo "${endpoints}"| sed 's/[a-z0-9_-]\+=//g')
+ DATE=$(date)
+ echo "$DATE writing mon endpoints to ${CEPH_CONFIG}: ${endpoints}"
+ cat < ${CEPH_CONFIG}
+[global]
+mon_host = ${mon_endpoints}
+[client.admin]
+keyring = ${KEYRING_FILE}
+EOF
+}
+# read the secret from an env var (for backward compatibility), or from the secret file
+ceph_secret=${ROOK_CEPH_SECRET}
+if [[ "$ceph_secret" == "" ]]; then
+ ceph_secret=$(cat /var/lib/rook-ceph-mon/secret.keyring)
+fi
+# create the keyring file
+cat < ${KEYRING_FILE}
+[${ROOK_CEPH_USERNAME}]
+key = ${ceph_secret}
+EOF
+# write the initial config file
+write_endpoints
+
+# export diff
+rm -f /mantle/export.bin
+if [ -z "${FROM_SNAP_NAME}" ]; then
+ rbd export-diff -p ${POOL_NAME} ${SRC_IMAGE_NAME}@${SRC_SNAP_NAME} /mantle/export.bin
+else
+ rbd export-diff -p ${POOL_NAME} --from-snap ${FROM_SNAP_NAME} ${SRC_IMAGE_NAME}@${SRC_SNAP_NAME} /mantle/export.bin
+fi`,
+ },
+ Env: []corev1.EnvVar{
+ {
+ Name: "ROOK_CEPH_USERNAME",
+ ValueFrom: &corev1.EnvVarSource{
+ SecretKeyRef: &corev1.SecretKeySelector{
+ Key: "ceph-username",
+ LocalObjectReference: corev1.LocalObjectReference{
+ Name: "rook-ceph-mon",
+ },
+ },
+ },
+ },
+ {
+ Name: "POOL_NAME",
+ Value: pv.Spec.CSI.VolumeAttributes["pool"],
+ },
+ {
+ Name: "SRC_IMAGE_NAME",
+ Value: pv.Spec.CSI.VolumeAttributes["imageName"],
+ },
+ {
+ Name: "FROM_SNAP_NAME",
+ Value: sourceBackupName,
+ },
+ {
+ Name: "SRC_SNAP_NAME",
+ Value: target.GetName(),
+ },
+ },
+ Image: r.podImage,
+ ImagePullPolicy: corev1.PullIfNotPresent,
+ TTY: true,
+ VolumeMounts: []corev1.VolumeMount{
+ {
+ MountPath: "/etc/ceph",
+ Name: "ceph-config",
+ },
+ {
+ MountPath: "/etc/rook",
+ Name: "mon-endpoint-volume",
+ },
+ {
+ MountPath: "/var/lib/rook-ceph-mon",
+ Name: "ceph-admin-secret",
+ ReadOnly: true,
+ },
+ {
+ MountPath: "/mantle",
+ Name: "volume-to-store",
+ },
+ },
+ },
+ }
+
+ fals := false
+ job.Spec.Template.Spec.Volumes = []corev1.Volume{
+ {
+ Name: "volume-to-store",
+ VolumeSource: corev1.VolumeSource{
+ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
+ ClaimName: makeExportDataPVCName(target),
+ },
+ },
+ },
+ {
+ Name: "ceph-admin-secret",
+ VolumeSource: corev1.VolumeSource{
+ Secret: &corev1.SecretVolumeSource{
+ SecretName: "rook-ceph-mon",
+ Optional: &fals,
+ Items: []corev1.KeyToPath{{
+ Key: "ceph-secret",
+ Path: "secret.keyring",
+ }},
+ },
+ },
+ },
+ {
+ Name: "mon-endpoint-volume",
+ VolumeSource: corev1.VolumeSource{
+ ConfigMap: &corev1.ConfigMapVolumeSource{
+ Items: []corev1.KeyToPath{
+ {
+ Key: "data",
+ Path: "mon-endpoints",
+ },
+ },
+ LocalObjectReference: corev1.LocalObjectReference{
+ Name: "rook-ceph-mon-endpoints",
+ },
+ },
+ },
+ },
+ {
+ Name: "ceph-config",
+ VolumeSource: corev1.VolumeSource{
+ EmptyDir: &corev1.EmptyDirVolumeSource{},
+ },
+ },
+ }
+
+ return nil
+ }); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (r *MantleBackupReconciler) checkIfExportJobIsCompleted(
+ ctx context.Context,
+ target *mantlev1.MantleBackup,
+) (ctrl.Result, error) {
+ var job batchv1.Job
+ if err := r.Client.Get(
+ ctx,
+ types.NamespacedName{
+ Name: makeExportJobName(target),
+ Namespace: r.managedCephClusterID,
+ },
+ &job,
+ ); err != nil {
+ return ctrl.Result{}, err
+ }
+
+ // Check if the export Job is completed or not. Note that we can't use
+ // meta.IsConditionTrue here, because job.Status.Conditions has
+ // []JobCondition type.
+ for _, cond := range job.Status.Conditions {
+ if cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue {
+ return ctrl.Result{}, nil
+ }
+ }
+
+ return ctrl.Result{Requeue: true}, nil
+}
+
+func (r *MantleBackupReconciler) createOrUpdateExportDataUploadJob(ctx context.Context, target *mantlev1.MantleBackup) error {
+ var job batchv1.Job
+ job.SetName(makeUploadJobName(target))
+ job.SetNamespace(r.managedCephClusterID)
+ if _, err := ctrl.CreateOrUpdate(ctx, r.Client, &job, func() error {
+ labels := job.GetLabels()
+ if labels == nil {
+ labels = map[string]string{}
+ }
+ labels["app.kubernetes.io/name"] = labelAppNameValue
+ labels["app.kubernetes.io/component"] = labelComponentUploadJob
+ job.SetLabels(labels)
+
+ var backoffLimit int32 = 65535
+ job.Spec.BackoffLimit = &backoffLimit
+
+ var fsGroup int64 = 10000
+ var runAsGroup int64 = 10000
+ runAsNonRoot := true
+ var runAsUser int64 = 10000
+ job.Spec.Template.Spec.SecurityContext = &corev1.PodSecurityContext{
+ FSGroup: &fsGroup,
+ RunAsGroup: &runAsGroup,
+ RunAsNonRoot: &runAsNonRoot,
+ RunAsUser: &runAsUser,
+ }
+
+ job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
+
+ job.Spec.Template.Spec.Containers = []corev1.Container{
+ {
+ Name: "upload",
+ Command: []string{
+ "/bin/bash",
+ "-c",
+ `
+set -e
+
+if [ "${CERT_FILE}" = "" ]; then
+ s5cmd --endpoint-url ${OBJECT_STORAGE_ENDPOINT} cp /mantle/export.bin "s3://${BUCKET_NAME}/${OBJ_NAME}"
+else
+ s5cmd --endpoint-url ${OBJECT_STORAGE_ENDPOINT} --credentials-file ${CERT_FILE} cp /mantle/export.bin "s3://${BUCKET_NAME}/${OBJ_NAME}"
+end`,
+ },
+ Env: []corev1.EnvVar{
+ {
+ Name: "OBJ_NAME",
+ Value: fmt.Sprintf("%s-%s.bin", target.GetName(), target.GetUID()),
+ },
+ {
+ Name: "BUCKET_NAME",
+ Value: r.objectStorageSettings.BucketName,
+ },
+ {
+ Name: "OBJECT_STORAGE_ENDPOINT",
+ Value: r.objectStorageSettings.Endpoint,
+ },
+ },
+ EnvFrom: []corev1.EnvFromSource{
+ {
+ SecretRef: &corev1.SecretEnvSource{
+ LocalObjectReference: corev1.LocalObjectReference{
+ Name: r.envSecret,
+ },
+ },
+ },
+ },
+ Image: r.podImage,
+ ImagePullPolicy: corev1.PullIfNotPresent,
+ VolumeMounts: []corev1.VolumeMount{
+ {
+ MountPath: "/mantle",
+ Name: "volume-to-store",
+ },
+ },
+ },
+ }
+
+ job.Spec.Template.Spec.Volumes = []corev1.Volume{
+ {
+ Name: "volume-to-store",
+ VolumeSource: corev1.VolumeSource{
+ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
+ ClaimName: makeExportDataPVCName(target),
+ },
+ },
+ },
+ }
+
+ if r.objectStorageSettings.CACertConfigMap != nil {
+ container := job.Spec.Template.Spec.Containers[0]
+ container.Env = append(
+ container.Env,
+ corev1.EnvVar{
+ Name: "CERT_FILE",
+ Value: fmt.Sprintf("/mantle_ca_cert/%s", *r.objectStorageSettings.CACertKey),
+ },
+ )
+ container.VolumeMounts = append(
+ container.VolumeMounts,
+ corev1.VolumeMount{
+ MountPath: "/mantle_ca_cert",
+ Name: "ca-cert",
+ },
+ )
+ job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes,
+ corev1.Volume{
+ Name: "ca-cert",
+ VolumeSource: corev1.VolumeSource{
+ ConfigMap: &corev1.ConfigMapVolumeSource{
+ LocalObjectReference: corev1.LocalObjectReference{
+ Name: *r.objectStorageSettings.CACertConfigMap,
+ },
+ },
+ },
+ },
+ )
+ }
+
+ return nil
+ }); err != nil {
+ return err
+ }
+
+ return nil
+}
+
func (r *MantleBackupReconciler) startImport(
ctx context.Context,
backup *mantlev1.MantleBackup,
diff --git a/internal/controller/mantlebackup_controller_test.go b/internal/controller/mantlebackup_controller_test.go
index ec6e7329..d41c74b5 100644
--- a/internal/controller/mantlebackup_controller_test.go
+++ b/internal/controller/mantlebackup_controller_test.go
@@ -15,12 +15,14 @@ import (
. "github.com/onsi/gomega"
"go.uber.org/mock/gomock"
"google.golang.org/grpc"
+ batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/kube-openapi/pkg/validation/strfmt"
+ ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -110,6 +112,9 @@ var _ = Describe("MantleBackup controller", func() {
resMgr.ClusterID,
RoleStandalone,
nil,
+ "dummy image",
+ "",
+ nil,
)
reconciler.ceph = testutil.NewFakeRBD()
err := reconciler.SetupWithManager(mgrUtil.GetManager())
@@ -333,7 +338,16 @@ var _ = Describe("MantleBackup controller", func() {
resMgr.ClusterID,
RolePrimary,
&PrimarySettings{
- Client: grpcClient,
+ Client: grpcClient,
+ ExportDataStorageClass: resMgr.StorageClassName,
+ },
+ "dummy image",
+ "dummy-secret-env",
+ &ObjectStorageSettings{
+ BucketName: "",
+ Endpoint: "",
+ CACertConfigMap: nil,
+ CACertKey: nil,
},
)
reconciler.ceph = testutil.NewFakeRBD()
@@ -359,7 +373,7 @@ var _ = Describe("MantleBackup controller", func() {
&proto.CreateOrUpdatePVCResponse{
Uid: "a7c9d5e2-4b8f-4e2a-9d3f-1b6a7c8e9f2b",
}, nil)
- var targetBackup mantlev1.MantleBackup
+ targetBackups := []*mantlev1.MantleBackup{}
grpcClient.EXPECT().CreateOrUpdateMantleBackup(gomock.Any(), gomock.Any()).
MinTimes(1).
DoAndReturn(
@@ -368,10 +382,12 @@ var _ = Describe("MantleBackup controller", func() {
req *proto.CreateOrUpdateMantleBackupRequest,
opts ...grpc.CallOption,
) (*proto.CreateOrUpdateMantleBackupResponse, error) {
+ var targetBackup mantlev1.MantleBackup
err := json.Unmarshal(req.GetMantleBackup(), &targetBackup)
if err != nil {
panic(err)
}
+ targetBackups = append(targetBackups, &targetBackup)
return &proto.CreateOrUpdateMantleBackupResponse{}, nil
})
grpcClient.EXPECT().ListMantleBackup(gomock.Any(), gomock.Any()).
@@ -382,7 +398,7 @@ var _ = Describe("MantleBackup controller", func() {
req *proto.ListMantleBackupRequest,
opts ...grpc.CallOption,
) (*proto.ListMantleBackupResponse, error) {
- data, err := json.Marshal([]mantlev1.MantleBackup{targetBackup})
+ data, err := json.Marshal(targetBackups)
if err != nil {
panic(err)
}
@@ -390,6 +406,16 @@ var _ = Describe("MantleBackup controller", func() {
MantleBackupList: data,
}, nil
})
+ grpcClient.EXPECT().SetSynchronizing(gomock.Any(), gomock.Any()).
+ MinTimes(1).
+ DoAndReturn(
+ func(
+ ctx context.Context,
+ req *proto.SetSynchronizingRequest,
+ opts ...grpc.CallOption,
+ ) (*proto.SetSynchronizingResponse, error) {
+ return &proto.SetSynchronizingResponse{}, nil
+ })
pv, pvc, err := resMgr.CreateUniquePVAndPVC(ctx, ns)
Expect(err).NotTo(HaveOccurred())
@@ -422,12 +448,111 @@ var _ = Describe("MantleBackup controller", func() {
snapID := backup.Status.SnapID
Expect(snapID).To(Equal(&snaps[0].Id))
- // TODO: Currently, there is no way to check if the annotations are set correctly.
- // After implementing export() function, the annotations check should be added
- // for various conditions.
+ // Make sure export() correctly annotates the MantleBackup resource.
+ syncMode, ok := backup.GetAnnotations()[annotSyncMode]
+ Expect(ok).To(BeTrue())
+ Expect(syncMode).To(Equal(syncModeFull))
+
+ // Make sure export() creates a PVC for exported data
+ var pvcExport corev1.PersistentVolumeClaim
+ err = k8sClient.Get(
+ ctx,
+ types.NamespacedName{
+ Name: fmt.Sprintf("mantle-export-%s", backup.GetUID()),
+ Namespace: resMgr.ClusterID,
+ },
+ &pvcExport,
+ )
+ Expect(err).NotTo(HaveOccurred())
+ Expect(pvcExport.GetLabels()["app.kubernetes.io/name"]).To(Equal("mantle"))
+ Expect(pvcExport.GetLabels()["app.kubernetes.io/component"]).To(Equal("export-data"))
+ Expect(pvcExport.Spec.AccessModes[0]).To(Equal(corev1.ReadWriteOnce))
+ Expect(*pvcExport.Spec.StorageClassName).To(Equal(resMgr.StorageClassName))
+ Expect(pvcExport.Spec.Resources.Requests.Storage().String()).To(Equal("2Gi"))
+
+ // Make sure export() creates a Job to export data.
+ var jobExport batchv1.Job
+ err = k8sClient.Get(
+ ctx,
+ types.NamespacedName{
+ Name: fmt.Sprintf("mantle-export-%s", backup.GetUID()),
+ Namespace: resMgr.ClusterID,
+ },
+ &jobExport,
+ )
+ Expect(err).NotTo(HaveOccurred())
+ Expect(jobExport.GetLabels()["app.kubernetes.io/name"]).To(Equal("mantle"))
+ Expect(jobExport.GetLabels()["app.kubernetes.io/component"]).To(Equal("export-job"))
+ Expect(*jobExport.Spec.BackoffLimit).To(Equal(int32(65535)))
+ Expect(*jobExport.Spec.Template.Spec.SecurityContext.FSGroup).To(Equal(int64(10000)))
+ Expect(*jobExport.Spec.Template.Spec.SecurityContext.RunAsUser).To(Equal(int64(10000)))
+ Expect(*jobExport.Spec.Template.Spec.SecurityContext.RunAsGroup).To(Equal(int64(10000)))
+ Expect(*jobExport.Spec.Template.Spec.SecurityContext.RunAsNonRoot).To(Equal(true))
+
+ // Make the export Job completed to proceed the reconciliation for backup.
+ err = resMgr.ChangeJobCondition(ctx, &jobExport, batchv1.JobComplete, corev1.ConditionTrue)
+ Expect(err).NotTo(HaveOccurred())
+
+ // Make sure the upload Job is created
+ Eventually(func(g Gomega, ctx context.Context) {
+ var jobUpload batchv1.Job
+ err = k8sClient.Get(
+ ctx,
+ types.NamespacedName{
+ Name: fmt.Sprintf("mantle-upload-%s", backup.GetUID()),
+ Namespace: resMgr.ClusterID,
+ },
+ &jobUpload,
+ )
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Expect(jobUpload.GetLabels()["app.kubernetes.io/name"]).To(Equal("mantle"))
+ g.Expect(jobUpload.GetLabels()["app.kubernetes.io/component"]).To(Equal("upload-job"))
+ g.Expect(*jobUpload.Spec.BackoffLimit).To(Equal(int32(65535)))
+ g.Expect(*jobUpload.Spec.Template.Spec.SecurityContext.FSGroup).To(Equal(int64(10000)))
+ g.Expect(*jobUpload.Spec.Template.Spec.SecurityContext.RunAsUser).To(Equal(int64(10000)))
+ g.Expect(*jobUpload.Spec.Template.Spec.SecurityContext.RunAsGroup).To(Equal(int64(10000)))
+ g.Expect(*jobUpload.Spec.Template.Spec.SecurityContext.RunAsNonRoot).To(Equal(true))
+ }).WithContext(ctx).Should(Succeed())
+
+ // Make the all existing MantleBackups in the (mocked) secondary Mantle
+ // ReadyToUse=True.
+ for _, backup := range targetBackups {
+ meta.SetStatusCondition(&backup.Status.Conditions, metav1.Condition{
+ Type: mantlev1.BackupConditionReadyToUse,
+ Status: metav1.ConditionTrue,
+ Reason: mantlev1.BackupReasonNone,
+ })
+ }
+
+ // Create another MantleBackup (backup2) to make sure it should become a incremental backup.
+ backup2, err := resMgr.CreateUniqueBackupFor(ctx, pvc)
+ Expect(err).NotTo(HaveOccurred())
+ waitForHavingFinalizer(ctx, backup2)
+ resMgr.WaitForBackupReady(ctx, backup2)
+ resMgr.WaitForBackupSyncedToRemote(ctx, backup2)
+
+ // Make sure backup2 is an incremental backup.
+ syncMode2, ok := backup2.GetAnnotations()[annotSyncMode]
+ Expect(ok).To(BeTrue())
+ Expect(syncMode2).To(Equal(syncModeIncremental))
+ err = k8sClient.Get(ctx, types.NamespacedName{Name: backup.GetName(), Namespace: backup.GetNamespace()}, backup)
+ Expect(err).NotTo(HaveOccurred())
+ diffTo, ok := backup.GetAnnotations()[annotDiffTo]
+ Expect(ok).To(BeTrue())
+ Expect(diffTo).To(Equal(backup2.GetName()))
+
+ // remove diffTo annotation of backup here to allow it to be deleted.
+ // FIXME: this process is for testing purposes only and should be removed in the near future.
+ _, err = ctrl.CreateOrUpdate(ctx, k8sClient, backup, func() error {
+ delete(backup.Annotations, annotDiffTo)
+ return nil
+ })
+ Expect(err).NotTo(HaveOccurred())
err = k8sClient.Delete(ctx, backup)
Expect(err).NotTo(HaveOccurred())
+ err = k8sClient.Delete(ctx, backup2)
+ Expect(err).NotTo(HaveOccurred())
testutil.CheckDeletedEventually[mantlev1.MantleBackup](ctx, k8sClient, backup.Name, backup.Namespace)
})
@@ -611,7 +736,7 @@ var _ = Describe("prepareForDataSynchronization", func() {
}
mbr := NewMantleBackupReconciler(ctrlClient,
- ctrlClient.Scheme(), "test", RolePrimary, nil)
+ ctrlClient.Scheme(), "test", RolePrimary, nil, "dummy image", "", nil)
ret, err := mbr.prepareForDataSynchronization(context.Background(),
backup, grpcClient)
diff --git a/internal/controller/mantlerestore_controller_test.go b/internal/controller/mantlerestore_controller_test.go
index 5ebe1b59..44a727b6 100644
--- a/internal/controller/mantlerestore_controller_test.go
+++ b/internal/controller/mantlerestore_controller_test.go
@@ -54,6 +54,9 @@ func (test *mantleRestoreControllerUnitTest) setupEnv() {
resMgr.ClusterID,
RoleStandalone,
nil,
+ "dummy image",
+ "",
+ nil,
)
backupReconciler.ceph = testutil.NewFakeRBD()
err := backupReconciler.SetupWithManager(test.mgrUtil.GetManager())
diff --git a/internal/controller/replication.go b/internal/controller/replication.go
index 86151861..47e6fb37 100644
--- a/internal/controller/replication.go
+++ b/internal/controller/replication.go
@@ -3,12 +3,14 @@ package controller
import (
"context"
"encoding/json"
+ "errors"
"fmt"
mantlev1 "github.com/cybozu-go/mantle/api/v1"
"github.com/cybozu-go/mantle/pkg/controller/proto"
"google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
@@ -22,9 +24,11 @@ const (
)
type PrimarySettings struct {
- ServiceEndpoint string
- Conn *grpc.ClientConn
- Client proto.MantleServiceClient
+ ServiceEndpoint string
+ Conn *grpc.ClientConn
+ Client proto.MantleServiceClient
+ MaxExportJobs int
+ ExportDataStorageClass string
}
type SecondaryServer struct {
@@ -204,3 +208,87 @@ func (s *SecondaryServer) ListMantleBackup(
}
return &proto.ListMantleBackupResponse{MantleBackupList: data}, nil
}
+
+func (s *SecondaryServer) SetSynchronizing(
+ ctx context.Context,
+ req *proto.SetSynchronizingRequest,
+) (*proto.SetSynchronizingResponse, error) {
+ target := mantlev1.MantleBackup{}
+ if err := s.client.Get(
+ ctx,
+ types.NamespacedName{Name: req.Name, Namespace: req.Namespace},
+ &target,
+ ); err != nil {
+ return nil, err
+ }
+
+ if meta.IsStatusConditionTrue(target.Status.Conditions, mantlev1.BackupConditionReadyToUse) {
+ return nil, errors.New("ReadyToUse is true")
+ }
+
+ // make sure sync-mode is correct.
+ if syncMode, ok := target.GetAnnotations()[annotSyncMode]; ok {
+ if syncMode == syncModeFull && req.DiffFrom != nil {
+ return nil, fmt.Errorf("annotated sync-mode is full but req.DiffFrom is not nil: %s", *req.DiffFrom)
+ }
+ if syncMode == syncModeIncremental && req.DiffFrom == nil {
+ return nil, fmt.Errorf("annotated sync-mode is incremental but req.DiffFrom is nil")
+ }
+ }
+
+ // make sure diff-from is correct.
+ if diffFrom, ok := target.GetAnnotations()[annotDiffFrom]; ok {
+ if req.DiffFrom == nil {
+ return nil, errors.New("annotated diff-from is not nil but req.DiffFrom is nil")
+ }
+ if *req.DiffFrom != diffFrom {
+ return nil, fmt.Errorf("annotated diff-from is not equal to req.DiffFrom: %s: %s", diffFrom, *req.DiffFrom)
+ }
+ }
+
+ if req.DiffFrom != nil {
+ source := mantlev1.MantleBackup{}
+ if err := s.client.Get(
+ ctx,
+ types.NamespacedName{Name: *req.DiffFrom, Namespace: req.Namespace},
+ &source,
+ ); err != nil {
+ return nil, err
+ }
+
+ if diffTo, ok := source.GetAnnotations()[annotDiffTo]; ok && diffTo != req.Name {
+ return nil, errors.New("diffTo is invalid")
+ }
+
+ if _, err := ctrl.CreateOrUpdate(ctx, s.client, &source, func() error {
+ annot := source.GetAnnotations()
+ if annot == nil {
+ annot = map[string]string{}
+ }
+ annot[annotDiffTo] = req.Name
+ source.SetAnnotations(annot)
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+ }
+
+ if _, err := ctrl.CreateOrUpdate(ctx, s.client, &target, func() error {
+ annot := target.GetAnnotations()
+ if annot == nil {
+ annot = map[string]string{}
+ }
+ if req.DiffFrom == nil {
+ annot[annotSyncMode] = syncModeFull
+ } else {
+ annot[annotSyncMode] = syncModeIncremental
+ annot[annotDiffFrom] = *req.DiffFrom
+ }
+ target.SetAnnotations(annot)
+ return nil
+ }); err != nil {
+ return nil, err
+ }
+
+ return &proto.SetSynchronizingResponse{}, nil
+}
diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go
index 18dc84ec..6ece714e 100644
--- a/internal/controller/suite_test.go
+++ b/internal/controller/suite_test.go
@@ -74,7 +74,8 @@ var _ = BeforeSuite(func(ctx SpecContext) {
Expect(k8sClient).NotTo(BeNil())
By("Setup common resources")
- resMgr = testutil.NewResourceManager(k8sClient)
+ resMgr, err = testutil.NewResourceManager(k8sClient)
+ Expect(err).NotTo(HaveOccurred())
err = resMgr.CreateStorageClass(ctx)
Expect(err).NotTo(HaveOccurred())
})
diff --git a/pkg/controller/proto/controller.pb.go b/pkg/controller/proto/controller.pb.go
index 25862996..767bb54d 100644
--- a/pkg/controller/proto/controller.pb.go
+++ b/pkg/controller/proto/controller.pb.go
@@ -295,6 +295,105 @@ func (x *ListMantleBackupResponse) GetMantleBackupList() []byte {
return nil
}
+// SetSynchronizingRequest is a request message for SetSynchronize RPC.
+type SetSynchronizingRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
+ DiffFrom *string `protobuf:"bytes,3,opt,name=diffFrom,proto3,oneof" json:"diffFrom,omitempty"`
+}
+
+func (x *SetSynchronizingRequest) Reset() {
+ *x = SetSynchronizingRequest{}
+ mi := &file_pkg_controller_proto_controller_proto_msgTypes[6]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *SetSynchronizingRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SetSynchronizingRequest) ProtoMessage() {}
+
+func (x *SetSynchronizingRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_pkg_controller_proto_controller_proto_msgTypes[6]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SetSynchronizingRequest.ProtoReflect.Descriptor instead.
+func (*SetSynchronizingRequest) Descriptor() ([]byte, []int) {
+ return file_pkg_controller_proto_controller_proto_rawDescGZIP(), []int{6}
+}
+
+func (x *SetSynchronizingRequest) GetName() string {
+ if x != nil {
+ return x.Name
+ }
+ return ""
+}
+
+func (x *SetSynchronizingRequest) GetNamespace() string {
+ if x != nil {
+ return x.Namespace
+ }
+ return ""
+}
+
+func (x *SetSynchronizingRequest) GetDiffFrom() string {
+ if x != nil && x.DiffFrom != nil {
+ return *x.DiffFrom
+ }
+ return ""
+}
+
+// SetSynchronizingResponse is a response message for SetSynchronize RPC.
+type SetSynchronizingResponse struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+}
+
+func (x *SetSynchronizingResponse) Reset() {
+ *x = SetSynchronizingResponse{}
+ mi := &file_pkg_controller_proto_controller_proto_msgTypes[7]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *SetSynchronizingResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SetSynchronizingResponse) ProtoMessage() {}
+
+func (x *SetSynchronizingResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_pkg_controller_proto_controller_proto_msgTypes[7]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SetSynchronizingResponse.ProtoReflect.Descriptor instead.
+func (*SetSynchronizingResponse) Descriptor() ([]byte, []int) {
+ return file_pkg_controller_proto_controller_proto_rawDescGZIP(), []int{7}
+}
+
var File_pkg_controller_proto_controller_proto protoreflect.FileDescriptor
var file_pkg_controller_proto_controller_proto_rawDesc = []byte{
@@ -323,30 +422,44 @@ var file_pkg_controller_proto_controller_proto_rawDesc = []byte{
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x10, 0x6d, 0x61, 0x6e, 0x74, 0x6c,
0x65, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28,
0x0c, 0x52, 0x10, 0x6d, 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x4c,
- 0x69, 0x73, 0x74, 0x32, 0xaf, 0x02, 0x0a, 0x0d, 0x4d, 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x53, 0x65,
- 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x56, 0x0a, 0x11, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4f,
- 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x56, 0x43, 0x12, 0x1f, 0x2e, 0x70, 0x72, 0x6f,
- 0x74, 0x6f, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4f, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74,
- 0x65, 0x50, 0x56, 0x43, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x70, 0x72,
- 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4f, 0x72, 0x55, 0x70, 0x64, 0x61,
- 0x74, 0x65, 0x50, 0x56, 0x43, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x71, 0x0a,
- 0x1a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4f, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x4d,
- 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x12, 0x28, 0x2e, 0x70, 0x72,
- 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4f, 0x72, 0x55, 0x70, 0x64, 0x61,
- 0x74, 0x65, 0x4d, 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65,
- 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72,
- 0x65, 0x61, 0x74, 0x65, 0x4f, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x4d, 0x61, 0x6e, 0x74,
- 0x6c, 0x65, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
- 0x12, 0x53, 0x0a, 0x10, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x42, 0x61,
- 0x63, 0x6b, 0x75, 0x70, 0x12, 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4c, 0x69, 0x73,
- 0x74, 0x4d, 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71,
- 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4c, 0x69, 0x73,
- 0x74, 0x4d, 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73,
- 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e,
- 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x79, 0x62, 0x6f, 0x7a, 0x75, 0x2d, 0x67, 0x6f, 0x2f, 0x6d, 0x61,
- 0x6e, 0x74, 0x6c, 0x65, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c,
- 0x6c, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
- 0x33,
+ 0x69, 0x73, 0x74, 0x22, 0x79, 0x0a, 0x17, 0x53, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x68, 0x72,
+ 0x6f, 0x6e, 0x69, 0x7a, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12,
+ 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61,
+ 0x6d, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18,
+ 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65,
+ 0x12, 0x1f, 0x0a, 0x08, 0x64, 0x69, 0x66, 0x66, 0x46, 0x72, 0x6f, 0x6d, 0x18, 0x03, 0x20, 0x01,
+ 0x28, 0x09, 0x48, 0x00, 0x52, 0x08, 0x64, 0x69, 0x66, 0x66, 0x46, 0x72, 0x6f, 0x6d, 0x88, 0x01,
+ 0x01, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x64, 0x69, 0x66, 0x66, 0x46, 0x72, 0x6f, 0x6d, 0x22, 0x1a,
+ 0x0a, 0x18, 0x53, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x69,
+ 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x84, 0x03, 0x0a, 0x0d, 0x4d,
+ 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x56, 0x0a, 0x11,
+ 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4f, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x56,
+ 0x43, 0x12, 0x1f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65,
+ 0x4f, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x56, 0x43, 0x52, 0x65, 0x71, 0x75, 0x65,
+ 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74,
+ 0x65, 0x4f, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x56, 0x43, 0x52, 0x65, 0x73, 0x70,
+ 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x71, 0x0a, 0x1a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4f, 0x72,
+ 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x4d, 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x42, 0x61, 0x63, 0x6b,
+ 0x75, 0x70, 0x12, 0x28, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74,
+ 0x65, 0x4f, 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x4d, 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x42,
+ 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x70,
+ 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4f, 0x72, 0x55, 0x70, 0x64,
+ 0x61, 0x74, 0x65, 0x4d, 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52,
+ 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x53, 0x0a, 0x10, 0x4c, 0x69, 0x73, 0x74, 0x4d,
+ 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x12, 0x1e, 0x2e, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x42, 0x61,
+ 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x6e, 0x74, 0x6c, 0x65, 0x42, 0x61,
+ 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x53, 0x0a, 0x10,
+ 0x53, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63, 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x69, 0x6e, 0x67,
+ 0x12, 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63,
+ 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+ 0x1a, 0x1f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x65, 0x74, 0x53, 0x79, 0x6e, 0x63,
+ 0x68, 0x72, 0x6f, 0x6e, 0x69, 0x7a, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
+ 0x65, 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
+ 0x63, 0x79, 0x62, 0x6f, 0x7a, 0x75, 0x2d, 0x67, 0x6f, 0x2f, 0x6d, 0x61, 0x6e, 0x74, 0x6c, 0x65,
+ 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2f,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -361,7 +474,7 @@ func file_pkg_controller_proto_controller_proto_rawDescGZIP() []byte {
return file_pkg_controller_proto_controller_proto_rawDescData
}
-var file_pkg_controller_proto_controller_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
+var file_pkg_controller_proto_controller_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
var file_pkg_controller_proto_controller_proto_goTypes = []any{
(*CreateOrUpdatePVCRequest)(nil), // 0: proto.CreateOrUpdatePVCRequest
(*CreateOrUpdatePVCResponse)(nil), // 1: proto.CreateOrUpdatePVCResponse
@@ -369,16 +482,20 @@ var file_pkg_controller_proto_controller_proto_goTypes = []any{
(*CreateOrUpdateMantleBackupResponse)(nil), // 3: proto.CreateOrUpdateMantleBackupResponse
(*ListMantleBackupRequest)(nil), // 4: proto.ListMantleBackupRequest
(*ListMantleBackupResponse)(nil), // 5: proto.ListMantleBackupResponse
+ (*SetSynchronizingRequest)(nil), // 6: proto.SetSynchronizingRequest
+ (*SetSynchronizingResponse)(nil), // 7: proto.SetSynchronizingResponse
}
var file_pkg_controller_proto_controller_proto_depIdxs = []int32{
0, // 0: proto.MantleService.CreateOrUpdatePVC:input_type -> proto.CreateOrUpdatePVCRequest
2, // 1: proto.MantleService.CreateOrUpdateMantleBackup:input_type -> proto.CreateOrUpdateMantleBackupRequest
4, // 2: proto.MantleService.ListMantleBackup:input_type -> proto.ListMantleBackupRequest
- 1, // 3: proto.MantleService.CreateOrUpdatePVC:output_type -> proto.CreateOrUpdatePVCResponse
- 3, // 4: proto.MantleService.CreateOrUpdateMantleBackup:output_type -> proto.CreateOrUpdateMantleBackupResponse
- 5, // 5: proto.MantleService.ListMantleBackup:output_type -> proto.ListMantleBackupResponse
- 3, // [3:6] is the sub-list for method output_type
- 0, // [0:3] is the sub-list for method input_type
+ 6, // 3: proto.MantleService.SetSynchronizing:input_type -> proto.SetSynchronizingRequest
+ 1, // 4: proto.MantleService.CreateOrUpdatePVC:output_type -> proto.CreateOrUpdatePVCResponse
+ 3, // 5: proto.MantleService.CreateOrUpdateMantleBackup:output_type -> proto.CreateOrUpdateMantleBackupResponse
+ 5, // 6: proto.MantleService.ListMantleBackup:output_type -> proto.ListMantleBackupResponse
+ 7, // 7: proto.MantleService.SetSynchronizing:output_type -> proto.SetSynchronizingResponse
+ 4, // [4:8] is the sub-list for method output_type
+ 0, // [0:4] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
@@ -389,13 +506,14 @@ func file_pkg_controller_proto_controller_proto_init() {
if File_pkg_controller_proto_controller_proto != nil {
return
}
+ file_pkg_controller_proto_controller_proto_msgTypes[6].OneofWrappers = []any{}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pkg_controller_proto_controller_proto_rawDesc,
NumEnums: 0,
- NumMessages: 6,
+ NumMessages: 8,
NumExtensions: 0,
NumServices: 1,
},
diff --git a/pkg/controller/proto/controller.proto b/pkg/controller/proto/controller.proto
index 58822953..3a61fa00 100644
--- a/pkg/controller/proto/controller.proto
+++ b/pkg/controller/proto/controller.proto
@@ -35,8 +35,20 @@ message ListMantleBackupResponse {
bytes mantleBackupList = 1;
}
+// SetSynchronizingRequest is a request message for SetSynchronize RPC.
+message SetSynchronizingRequest {
+ string name = 1;
+ string namespace = 2;
+ optional string diffFrom = 3;
+}
+
+// SetSynchronizingResponse is a response message for SetSynchronize RPC.
+message SetSynchronizingResponse {
+}
+
service MantleService {
rpc CreateOrUpdatePVC(CreateOrUpdatePVCRequest) returns (CreateOrUpdatePVCResponse);
rpc CreateOrUpdateMantleBackup(CreateOrUpdateMantleBackupRequest) returns (CreateOrUpdateMantleBackupResponse);
rpc ListMantleBackup(ListMantleBackupRequest) returns (ListMantleBackupResponse);
+ rpc SetSynchronizing(SetSynchronizingRequest) returns (SetSynchronizingResponse);
}
diff --git a/pkg/controller/proto/controller_grpc.pb.go b/pkg/controller/proto/controller_grpc.pb.go
index 16b126c8..577dc53a 100644
--- a/pkg/controller/proto/controller_grpc.pb.go
+++ b/pkg/controller/proto/controller_grpc.pb.go
@@ -22,6 +22,7 @@ const (
MantleService_CreateOrUpdatePVC_FullMethodName = "/proto.MantleService/CreateOrUpdatePVC"
MantleService_CreateOrUpdateMantleBackup_FullMethodName = "/proto.MantleService/CreateOrUpdateMantleBackup"
MantleService_ListMantleBackup_FullMethodName = "/proto.MantleService/ListMantleBackup"
+ MantleService_SetSynchronizing_FullMethodName = "/proto.MantleService/SetSynchronizing"
)
// MantleServiceClient is the client API for MantleService service.
@@ -31,6 +32,7 @@ type MantleServiceClient interface {
CreateOrUpdatePVC(ctx context.Context, in *CreateOrUpdatePVCRequest, opts ...grpc.CallOption) (*CreateOrUpdatePVCResponse, error)
CreateOrUpdateMantleBackup(ctx context.Context, in *CreateOrUpdateMantleBackupRequest, opts ...grpc.CallOption) (*CreateOrUpdateMantleBackupResponse, error)
ListMantleBackup(ctx context.Context, in *ListMantleBackupRequest, opts ...grpc.CallOption) (*ListMantleBackupResponse, error)
+ SetSynchronizing(ctx context.Context, in *SetSynchronizingRequest, opts ...grpc.CallOption) (*SetSynchronizingResponse, error)
}
type mantleServiceClient struct {
@@ -71,6 +73,16 @@ func (c *mantleServiceClient) ListMantleBackup(ctx context.Context, in *ListMant
return out, nil
}
+func (c *mantleServiceClient) SetSynchronizing(ctx context.Context, in *SetSynchronizingRequest, opts ...grpc.CallOption) (*SetSynchronizingResponse, error) {
+ cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
+ out := new(SetSynchronizingResponse)
+ err := c.cc.Invoke(ctx, MantleService_SetSynchronizing_FullMethodName, in, out, cOpts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
// MantleServiceServer is the server API for MantleService service.
// All implementations must embed UnimplementedMantleServiceServer
// for forward compatibility.
@@ -78,6 +90,7 @@ type MantleServiceServer interface {
CreateOrUpdatePVC(context.Context, *CreateOrUpdatePVCRequest) (*CreateOrUpdatePVCResponse, error)
CreateOrUpdateMantleBackup(context.Context, *CreateOrUpdateMantleBackupRequest) (*CreateOrUpdateMantleBackupResponse, error)
ListMantleBackup(context.Context, *ListMantleBackupRequest) (*ListMantleBackupResponse, error)
+ SetSynchronizing(context.Context, *SetSynchronizingRequest) (*SetSynchronizingResponse, error)
mustEmbedUnimplementedMantleServiceServer()
}
@@ -97,6 +110,9 @@ func (UnimplementedMantleServiceServer) CreateOrUpdateMantleBackup(context.Conte
func (UnimplementedMantleServiceServer) ListMantleBackup(context.Context, *ListMantleBackupRequest) (*ListMantleBackupResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListMantleBackup not implemented")
}
+func (UnimplementedMantleServiceServer) SetSynchronizing(context.Context, *SetSynchronizingRequest) (*SetSynchronizingResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method SetSynchronizing not implemented")
+}
func (UnimplementedMantleServiceServer) mustEmbedUnimplementedMantleServiceServer() {}
func (UnimplementedMantleServiceServer) testEmbeddedByValue() {}
@@ -172,6 +188,24 @@ func _MantleService_ListMantleBackup_Handler(srv interface{}, ctx context.Contex
return interceptor(ctx, in, info, handler)
}
+func _MantleService_SetSynchronizing_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(SetSynchronizingRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(MantleServiceServer).SetSynchronizing(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: MantleService_SetSynchronizing_FullMethodName,
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(MantleServiceServer).SetSynchronizing(ctx, req.(*SetSynchronizingRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
// MantleService_ServiceDesc is the grpc.ServiceDesc for MantleService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -191,6 +225,10 @@ var MantleService_ServiceDesc = grpc.ServiceDesc{
MethodName: "ListMantleBackup",
Handler: _MantleService_ListMantleBackup_Handler,
},
+ {
+ MethodName: "SetSynchronizing",
+ Handler: _MantleService_SetSynchronizing_Handler,
+ },
},
Streams: []grpc.StreamDesc{},
Metadata: "pkg/controller/proto/controller.proto",
diff --git a/pkg/controller/proto/controller_grpc.pb_mock.go b/pkg/controller/proto/controller_grpc.pb_mock.go
index 5c49bf7b..31e0210b 100644
--- a/pkg/controller/proto/controller_grpc.pb_mock.go
+++ b/pkg/controller/proto/controller_grpc.pb_mock.go
@@ -101,6 +101,26 @@ func (mr *MockMantleServiceClientMockRecorder) ListMantleBackup(ctx, in any, opt
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListMantleBackup", reflect.TypeOf((*MockMantleServiceClient)(nil).ListMantleBackup), varargs...)
}
+// SetSynchronizing mocks base method.
+func (m *MockMantleServiceClient) SetSynchronizing(ctx context.Context, in *SetSynchronizingRequest, opts ...grpc.CallOption) (*SetSynchronizingResponse, error) {
+ m.ctrl.T.Helper()
+ varargs := []any{ctx, in}
+ for _, a := range opts {
+ varargs = append(varargs, a)
+ }
+ ret := m.ctrl.Call(m, "SetSynchronizing", varargs...)
+ ret0, _ := ret[0].(*SetSynchronizingResponse)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// SetSynchronizing indicates an expected call of SetSynchronizing.
+func (mr *MockMantleServiceClientMockRecorder) SetSynchronizing(ctx, in any, opts ...any) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ varargs := append([]any{ctx, in}, opts...)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSynchronizing", reflect.TypeOf((*MockMantleServiceClient)(nil).SetSynchronizing), varargs...)
+}
+
// MockMantleServiceServer is a mock of MantleServiceServer interface.
type MockMantleServiceServer struct {
ctrl *gomock.Controller
@@ -170,6 +190,21 @@ func (mr *MockMantleServiceServerMockRecorder) ListMantleBackup(arg0, arg1 any)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListMantleBackup", reflect.TypeOf((*MockMantleServiceServer)(nil).ListMantleBackup), arg0, arg1)
}
+// SetSynchronizing mocks base method.
+func (m *MockMantleServiceServer) SetSynchronizing(arg0 context.Context, arg1 *SetSynchronizingRequest) (*SetSynchronizingResponse, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "SetSynchronizing", arg0, arg1)
+ ret0, _ := ret[0].(*SetSynchronizingResponse)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// SetSynchronizing indicates an expected call of SetSynchronizing.
+func (mr *MockMantleServiceServerMockRecorder) SetSynchronizing(arg0, arg1 any) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSynchronizing", reflect.TypeOf((*MockMantleServiceServer)(nil).SetSynchronizing), arg0, arg1)
+}
+
// mustEmbedUnimplementedMantleServiceServer mocks base method.
func (m *MockMantleServiceServer) mustEmbedUnimplementedMantleServiceServer() {
m.ctrl.T.Helper()
diff --git a/test/e2e/testdata/values-mantle-primary-template.yaml b/test/e2e/testdata/values-mantle-primary-template.yaml
index b0871cdb..544397ff 100644
--- a/test/e2e/testdata/values-mantle-primary-template.yaml
+++ b/test/e2e/testdata/values-mantle-primary-template.yaml
@@ -1,3 +1,5 @@
controller:
role: primary
mantleServiceEndpoint: {ENDPOINT}
+ objectStorageBucketName: dummy
+ objectStorageEndpoint: dummy
diff --git a/test/e2e/testdata/values-mantle-secondary.yaml b/test/e2e/testdata/values-mantle-secondary.yaml
index ee299f91..53ea00ed 100644
--- a/test/e2e/testdata/values-mantle-secondary.yaml
+++ b/test/e2e/testdata/values-mantle-secondary.yaml
@@ -3,6 +3,8 @@ controller:
mantleServiceEndpoint: ":58080"
ports:
- containerPort: 58080
+ objectStorageBucketName: dummy
+ objectStorageEndpoint: dummy
secondaryService:
type: NodePort