Skip to content

Commit

Permalink
test/e2e: make sure all temporary resources are deleted after sync su…
Browse files Browse the repository at this point in the history
…cceeded

Signed-off-by: Ryotaro Banno <ryotaro.banno@gmail.com>
  • Loading branch information
ushitora-anqou committed Dec 24, 2024
1 parent 631465a commit e0d80e1
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 0 deletions.
56 changes: 56 additions & 0 deletions test/e2e/multik8s/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"errors"
"os"
"reflect"
"slices"
"strings"
"testing"
"time"

Expand All @@ -16,6 +18,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

mantlev1 "github.com/cybozu-go/mantle/api/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -177,6 +180,59 @@ func replicationTestSuite() {
return nil
}).Should(Succeed())

By("checking all temporary Jobs related to export and import of RBD images are removed")
primaryJobList, err := getObjectList[batchv1.JobList](primaryK8sCluster, "job", cephClusterNamespace)
Expect(err).NotTo(HaveOccurred())
Expect(slices.ContainsFunc(primaryJobList.Items, func(job batchv1.Job) bool {
n := job.GetName()
return strings.HasPrefix(n, "mantle-export-") ||
strings.HasPrefix(n, "mantle-upload-")
})).To(BeFalse())
secondaryJobList, err := getObjectList[batchv1.JobList](secondaryK8sCluster, "job", cephClusterNamespace)
Expect(err).NotTo(HaveOccurred())
Expect(slices.ContainsFunc(secondaryJobList.Items, func(job batchv1.Job) bool {
n := job.GetName()
return strings.HasPrefix(n, "mantle-import-") ||
strings.HasPrefix(n, "mantle-discard-")
})).To(BeFalse())

By("checking all temporary PVCs related to export and import of RBD images are removed")
primaryPVCList, err := getObjectList[corev1.PersistentVolumeClaimList](
primaryK8sCluster, "pvc", cephClusterNamespace)
Expect(err).NotTo(HaveOccurred())
Expect(slices.ContainsFunc(primaryPVCList.Items, func(pvc corev1.PersistentVolumeClaim) bool {
n := pvc.GetName()
return strings.HasPrefix(n, "mantle-export-")
})).To(BeFalse())
secondaryPVCList, err := getObjectList[corev1.PersistentVolumeClaimList](
secondaryK8sCluster, "pvc", cephClusterNamespace)
Expect(err).NotTo(HaveOccurred())
Expect(slices.ContainsFunc(secondaryPVCList.Items, func(pvc corev1.PersistentVolumeClaim) bool {
n := pvc.GetName()
return strings.HasPrefix(n, "mantle-discard-")
})).To(BeFalse())

By("checking all temporary PVs related to export and import of RBD images are removed")
secondaryPVList, err := getObjectList[corev1.PersistentVolumeList](secondaryK8sCluster, "pv", cephClusterNamespace)
Expect(err).NotTo(HaveOccurred())
Expect(slices.ContainsFunc(secondaryPVList.Items, func(pv corev1.PersistentVolume) bool {
n := pv.GetName()
return strings.HasPrefix(n, "mantle-discard-")
})).To(BeFalse())

By("checking all temporary objects in the object storage related to export and import of RBD images are removed")
objectStorageClient, err := createObjectStorageClient(ctx)
Expect(err).NotTo(HaveOccurred())
listOutput, err := objectStorageClient.listObjects(ctx)
Expect(err).NotTo(HaveOccurred())
found := false
for _, content := range listOutput.Contents {
if strings.HasPrefix(*content.Key, backupName) {
found = true
}
}
Expect(found).To(BeFalse())

By("checking MantleRestore correctly restores the replicated backups in both clusters")
for _, clusterNo := range []int{primaryK8sCluster, secondaryK8sCluster} {
clusterName := "primary"
Expand Down
107 changes: 107 additions & 0 deletions test/e2e/multik8s/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package multik8s

import (
"bytes"
"context"
_ "embed"
"encoding/json"
"errors"
Expand All @@ -12,6 +13,9 @@ import (
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
mantlev1 "github.com/cybozu-go/mantle/api/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -180,6 +184,26 @@ func getDeploy(clusterNo int, namespace, name string) (*appsv1.Deployment, error
return getObject[appsv1.Deployment](clusterNo, "deploy", namespace, name)
}

func getObjectList[T any](clusterNo int, kind, namespace string) (*T, error) {
var stdout []byte
var err error
if namespace == "" {
stdout, _, err = kubectl(clusterNo, nil, "get", kind, "-o", "json")
} else {
stdout, _, err = kubectl(clusterNo, nil, "get", kind, "-n", namespace, "-o", "json")
}
if err != nil {
return nil, err
}

var objList T
if err := json.Unmarshal(stdout, &objList); err != nil {
return nil, err
}

return &objList, nil
}

func changeClusterRole(clusterNo int, newRole string) error {
deployName := "mantle-controller"
deploy, err := getDeploy(clusterNo, cephClusterNamespace, deployName)
Expand Down Expand Up @@ -240,3 +264,86 @@ func changeClusterRole(clusterNo int, newRole string) error {

return nil
}

type objectStorageClient struct {
cli *s3.Client
bucketName string
}

func (c *objectStorageClient) listObjects(ctx context.Context) (*s3.ListObjectsV2Output, error) {
return c.cli.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: &c.bucketName,
})
}

func createObjectStorageClient(ctx context.Context) (*objectStorageClient, error) {
// Find the endpoint of the object storage from the command-line arguments for mantle-controller.
stdout, _, err := kubectl(primaryK8sCluster, nil,
"get", "deploy", "-n", cephClusterNamespace, "mantle-controller", "-o", "json")
if err != nil {
return nil, fmt.Errorf("failed to get deploy: %w", err)
}
var deploy appsv1.Deployment
if err := json.Unmarshal(stdout, &deploy); err != nil {
return nil, fmt.Errorf("failed to unmarshal deploy: %w", err)
}
args := deploy.Spec.Template.Spec.Containers[0].Args
endpointIndex := slices.IndexFunc(args, func(s string) bool {
return strings.HasPrefix(s, "--object-storage-endpoint=")
})
if endpointIndex == -1 {
return nil, errors.New("failed to find object storage endpoint")
}
objectStorageEndpoint, _ := strings.CutPrefix(args[endpointIndex], "--object-storage-endpoint=")

// Get the bucket name from the OBC.
stdout, _, err = kubectl(secondaryK8sCluster, nil,
"get", "obc", "-n", cephClusterNamespace, "export-data", "-o", "json")
if err != nil {
return nil, fmt.Errorf("failed to get obc: %w", err)
}
var obc struct {
Spec struct {
BucketName string `json:"bucketName"`
} `json:"spec"`
}
if err := json.Unmarshal(stdout, &obc); err != nil {
return nil, fmt.Errorf("failed to unmarshal obc: %w", err)
}

// Get the credentials from the Secret.
stdout, _, err = kubectl(secondaryK8sCluster, nil,
"get", "secret", "-n", cephClusterNamespace, "export-data", "-o", "json")
if err != nil {
return nil, fmt.Errorf("failed to get export-data secret: %w", err)
}
var secret corev1.Secret
if err := json.Unmarshal(stdout, &secret); err != nil {
return nil, fmt.Errorf("failed to unmarshal secret: %w", err)
}
awsAccessKeyID := secret.Data["AWS_ACCESS_KEY_ID"]
awsSecretAccessKey := secret.Data["AWS_SECRET_ACCESS_KEY"]

// Construct a S3 client.
sdkConfig, err := config.LoadDefaultConfig(
ctx,
config.WithRegion("ceph"),
config.WithCredentialsProvider(
aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) {
return aws.Credentials{
AccessKeyID: string(awsAccessKeyID),
SecretAccessKey: string(awsSecretAccessKey),
}, nil
}),
),
)
if err != nil {
return nil, fmt.Errorf("failed to load default config: %w", err)
}
s3Client := s3.NewFromConfig(sdkConfig, func(o *s3.Options) {
o.BaseEndpoint = &objectStorageEndpoint
o.UsePathStyle = true
})

return &objectStorageClient{cli: s3Client, bucketName: obc.Spec.BucketName}, nil
}

0 comments on commit e0d80e1

Please sign in to comment.