Skip to content

Commit

Permalink
implement prepareForDataSynchronization func
Browse files Browse the repository at this point in the history
Signed-off-by: Shinya Hayashi <shinya-hayashi@cybozu.co.jp>
  • Loading branch information
peng225 committed Oct 10, 2024
1 parent 4120d38 commit 4ac9d2d
Show file tree
Hide file tree
Showing 8 changed files with 684 additions and 35 deletions.
34 changes: 34 additions & 0 deletions docs/controller-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
- [CreateOrUpdateMantleBackupResponse](#proto-CreateOrUpdateMantleBackupResponse)
- [CreateOrUpdatePVCRequest](#proto-CreateOrUpdatePVCRequest)
- [CreateOrUpdatePVCResponse](#proto-CreateOrUpdatePVCResponse)
- [ListMantleBackupRequest](#proto-ListMantleBackupRequest)
- [ListMantleBackupResponse](#proto-ListMantleBackupResponse)

- [MantleService](#proto-MantleService)

Expand Down Expand Up @@ -78,6 +80,37 @@ CreateOrUpdatePVCResponse is a response message for CreateOrUpdatePVC RPC.




<a name="proto-ListMantleBackupRequest"></a>

### ListMantleBackupRequest
ListMantleBackupRequest is a request message for ListMantleBackup RPC.


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| pvcUID | [string](#string) | | |
| namespace | [string](#string) | | |






<a name="proto-ListMantleBackupResponse"></a>

### ListMantleBackupResponse
ListMantleBackupResponse is a response message for ListMantleBackup RPC.


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| mantleBackupList | [bytes](#bytes) | | |








Expand All @@ -94,6 +127,7 @@ CreateOrUpdatePVCResponse is a response message for CreateOrUpdatePVC RPC.
| ----------- | ------------ | ------------- | ------------|
| CreateOrUpdatePVC | [CreateOrUpdatePVCRequest](#proto-CreateOrUpdatePVCRequest) | [CreateOrUpdatePVCResponse](#proto-CreateOrUpdatePVCResponse) | |
| CreateOrUpdateMantleBackup | [CreateOrUpdateMantleBackupRequest](#proto-CreateOrUpdateMantleBackupRequest) | [CreateOrUpdateMantleBackupResponse](#proto-CreateOrUpdateMantleBackupResponse) | |
| ListMantleBackup | [ListMantleBackupRequest](#proto-ListMantleBackupRequest) | [ListMantleBackupResponse](#proto-ListMantleBackupResponse) | |



Expand Down
9 changes: 9 additions & 0 deletions internal/controller/internal/testutil/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ func (r *ResourceManager) WaitForBackupReady(ctx context.Context, backup *mantle
}).WithContext(ctx).Should(Succeed())
}

func (r *ResourceManager) WaitForBackupSyncedToRemote(ctx context.Context, backup *mantlev1.MantleBackup) {
EventuallyWithOffset(1, func(g Gomega, ctx context.Context) {
err := r.client.Get(ctx, types.NamespacedName{Name: backup.Name, Namespace: backup.Namespace}, backup)
g.Expect(err).NotTo(HaveOccurred())

g.Expect(meta.IsStatusConditionTrue(backup.Status.Conditions, mantlev1.BackupConditionSyncedToRemote)).Should(BeTrue())
}).WithContext(ctx).Should(Succeed())
}

// cf. https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#pointer-method-example
type ObjectConstraint[T any] interface {
client.Object
Expand Down
148 changes: 138 additions & 10 deletions internal/controller/mantlebackup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"os/exec"
"sort"
"syscall"
"time"

Expand All @@ -30,7 +31,12 @@ const (
labelLocalBackupTargetPVCUID = "mantle.cybozu.io/local-backup-target-pvc-uid"
labelRemoteBackupTargetPVCUID = "mantle.cybozu.io/remote-backup-target-pvc-uid"
annotRemoteUID = "mantle.cybozu.io/remote-uid"
annotDiffFrom = "mantle.cybozu.io/diff-from"
annotDiffTo = "mantle.cybozu.io/diff-to"
annotSyncMode = "mantle.cybozu.io/sync-mode"

syncModeFull = "full"
syncModeIncremental = "incremental"
)

// MantleBackupReconciler reconciles a MantleBackup object
Expand Down Expand Up @@ -370,10 +376,14 @@ func (r *MantleBackupReconciler) replicate(
if err != nil || result != (ctrl.Result{}) {
return result, err
}
prepareResult, result, err := r.prepareForDataSynchronization(ctx, backup, r.primarySettings.Client)
if err != nil || result != (ctrl.Result{}) {
prepareResult, err := r.prepareForDataSynchronization(ctx, backup, r.primarySettings.Client)
if err != nil {
return result, err
}

// FIXME: Delete this code after implementing export().
prepareResult.isSecondaryMantleBackupReadyToUse = true

if prepareResult.isSecondaryMantleBackupReadyToUse {
return r.primaryCleanup(ctx, logger, backup)
}
Expand Down Expand Up @@ -589,15 +599,133 @@ type dataSyncPrepareResult struct {
}

func (r *MantleBackupReconciler) prepareForDataSynchronization(
_ context.Context,
_ *mantlev1.MantleBackup,
_ proto.MantleServiceClient,
) (*dataSyncPrepareResult, ctrl.Result, error) { //nolint:unparam
ctx context.Context,
backup *mantlev1.MantleBackup,
msc proto.MantleServiceClient,
) (*dataSyncPrepareResult, error) {
exportTargetPVCUID := backup.GetLabels()[labelLocalBackupTargetPVCUID]
resp, err := msc.ListMantleBackup(
ctx,
&proto.ListMantleBackupRequest{
PvcUID: exportTargetPVCUID,
Namespace: backup.GetNamespace(),
},
)
if err != nil {
return nil, err
}
secondaryBackups := make([]mantlev1.MantleBackup, 0)
err = json.Unmarshal(resp.MantleBackupList, &secondaryBackups)
if err != nil {
return nil, err
}
secondaryBackupSet := convertToMap(secondaryBackups)

isSecondaryMantleBackupReadyToUse := false
secondaryBackup, ok := secondaryBackupSet[backup.GetName()]
if !ok {
return nil, fmt.Errorf("secondary MantleBackup not found: %s, %s",
backup.GetName(), backup.GetNamespace())
}
isSecondaryMantleBackupReadyToUse = meta.IsStatusConditionTrue(
secondaryBackup.Status.Conditions,
mantlev1.BackupConditionReadyToUse,
)

if syncMode, ok := backup.GetAnnotations()[annotSyncMode]; ok {
switch syncMode {
case syncModeFull:
return &dataSyncPrepareResult{
isIncremental: false,
isSecondaryMantleBackupReadyToUse: isSecondaryMantleBackupReadyToUse,
diffFrom: nil,
}, nil
case syncModeIncremental:
diffFromName, ok := backup.GetAnnotations()[annotDiffFrom]
if !ok {
return nil, fmt.Errorf(`"%s" annotation is missing`, annotDiffFrom)
}

var diffFrom mantlev1.MantleBackup
err = r.Client.Get(ctx, types.NamespacedName{
Name: diffFromName,
Namespace: backup.GetNamespace(),
}, &diffFrom)
if err != nil {
return nil, err
}

return &dataSyncPrepareResult{
isIncremental: true,
isSecondaryMantleBackupReadyToUse: isSecondaryMantleBackupReadyToUse,
diffFrom: &diffFrom,
}, nil
default:
return nil, fmt.Errorf("unknown sync mode: %s", syncMode)
}
}

var primaryBackupList mantlev1.MantleBackupList
// TODO: Perhaps, we may have to use the client without cache.
err = r.Client.List(ctx, &primaryBackupList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{labelLocalBackupTargetPVCUID: exportTargetPVCUID}),
Namespace: backup.GetNamespace(),
})
if err != nil {
return nil, err
}

diffFrom := searchForDiffOriginMantleBackup(backup, primaryBackupList.Items, secondaryBackupSet)
isIncremental := (diffFrom != nil)

return &dataSyncPrepareResult{
isIncremental: false,
isSecondaryMantleBackupReadyToUse: true,
diffFrom: nil,
}, ctrl.Result{}, nil
isIncremental: isIncremental,
isSecondaryMantleBackupReadyToUse: isSecondaryMantleBackupReadyToUse,
diffFrom: diffFrom,
}, nil
}

func convertToMap(mantleBackups []mantlev1.MantleBackup) map[string]*mantlev1.MantleBackup {
m := make(map[string]*mantlev1.MantleBackup)
for _, mantleBackup := range mantleBackups {
mantleBackup := mantleBackup
m[mantleBackup.GetName()] = &mantleBackup
}
return m
}

func searchForDiffOriginMantleBackup(
backup *mantlev1.MantleBackup,
primaryBackups []mantlev1.MantleBackup,
secondaryBackupSet map[string]*mantlev1.MantleBackup,
) *mantlev1.MantleBackup {
candidates := make([]*mantlev1.MantleBackup, 0)
for _, primaryBackup := range primaryBackups {
primaryBackup := primaryBackup
secondaryBackup, ok := secondaryBackupSet[primaryBackup.Name]
if !ok {
continue
}
if !meta.IsStatusConditionTrue(primaryBackup.Status.Conditions, mantlev1.BackupConditionReadyToUse) ||
!meta.IsStatusConditionTrue(secondaryBackup.Status.Conditions, mantlev1.BackupConditionReadyToUse) {
continue
}
if !primaryBackup.DeletionTimestamp.IsZero() || !secondaryBackup.DeletionTimestamp.IsZero() {
continue
}
if *backup.Status.SnapID <= *primaryBackup.Status.SnapID {
continue
}
candidates = append(candidates, &primaryBackup)
}
if len(candidates) == 0 {
return nil
}
// Sort by SnapID in descending order.
sort.Slice(candidates, func(i, j int) bool {
return *candidates[i].Status.SnapID > *candidates[j].Status.SnapID
})
return candidates[0]
}

func (r *MantleBackupReconciler) export(
Expand Down
Loading

0 comments on commit 4ac9d2d

Please sign in to comment.