From 818b5f97caa6df47316652a4fbd9c4b73425007c Mon Sep 17 00:00:00 2001 From: Dhruv Batheja Date: Wed, 19 Oct 2022 17:17:33 +0200 Subject: [PATCH] Controller tests setup (#495) --- Makefile | 11 +- .../flinkcluster_controller_test.go | 40 +++ .../flinkcluster_converter_test.go | 333 +++++++++--------- controllers/flinkcluster/suite_test.go | 14 +- 4 files changed, 226 insertions(+), 172 deletions(-) create mode 100644 controllers/flinkcluster/flinkcluster_controller_test.go diff --git a/Makefile b/Makefile index cd3eea54..5a0ccddb 100644 --- a/Makefile +++ b/Makefile @@ -11,6 +11,11 @@ RESOURCE_PREFIX ?= flink-operator- # The Kubernetes namespace to limit watching. WATCH_NAMESPACE ?= +# Env test configuration +ENVTEST_K8S_VERSION=1.25.0 +SETUP_ENVTEST_VERSION=v0.0.0-20221007015352-8ad090e0663e +LOCALBIN=$(shell pwd)/bin + all: build ##@ General @@ -56,8 +61,7 @@ vet: ## Run go vet against code. test: manifests generate fmt vet tidy kustomize envtest ## Run tests. rm -rf config/test && mkdir -p config/test/crd $(KUSTOMIZE) build config/crd > config/test/crd/flinkoperator.k8s.io_flinkclusters.yaml - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverprofile cover.out - + KUBEBUILDER_ASSETS=$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path) go test ./... -coverprofile cover.out ##@ Build build: generate fmt vet tidy ## Build manager binary. @@ -125,7 +129,8 @@ kustomize: ## Download kustomize locally if necessary. ENVTEST = $(shell pwd)/bin/setup-envtest .PHONY: envtest envtest: ## Download envtest-setup locally if necessary. - $(call go-get-tool,$(ENVTEST),sigs.k8s.io/controller-runtime/tools/setup-envtest@latest) + $(call go-get-tool,$(ENVTEST),sigs.k8s.io/controller-runtime/tools/setup-envtest@$(SETUP_ENVTEST_VERSION)) + CRD_REF_DOCS = $(shell pwd)/bin/crd-ref-docs crd-ref-docs: diff --git a/controllers/flinkcluster/flinkcluster_controller_test.go b/controllers/flinkcluster/flinkcluster_controller_test.go new file mode 100644 index 00000000..843643f7 --- /dev/null +++ b/controllers/flinkcluster/flinkcluster_controller_test.go @@ -0,0 +1,40 @@ +package flinkcluster + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/types" + "time" +) + +var _ = Describe("FlinkCluster Controller", func() { + // Utility constants and functions here + const ( + timeout = time.Second * 10 + duration = time.Second * 10 + interval = time.Millisecond * 250 + ) + + Context("When creating a new FlinkCluster", func() { + It("Should create the JobManager Statefulset", func() { + // Test code here + By("By creating a new FlinkCluster") + dummyFlinkCluster := getDummyFlinkCluster() + Expect(k8sClient.Create(ctx, dummyFlinkCluster)).Should(Succeed()) + + expectedJobManagerName := dummyFlinkCluster.ObjectMeta.Name + "-jobmanager" + jobManagerLookupKey := types.NamespacedName{Name: expectedJobManagerName, Namespace: dummyFlinkCluster.ObjectMeta.Namespace} + createdJobManagerStatefulSet := &appsv1.StatefulSet{} + + Eventually(func() bool { + err := k8sClient.Get(ctx, jobManagerLookupKey, createdJobManagerStatefulSet) + if err != nil { + return false + } + return true + }, timeout, interval).Should(BeTrue()) + + }) + }) +}) diff --git a/controllers/flinkcluster/flinkcluster_converter_test.go b/controllers/flinkcluster/flinkcluster_converter_test.go index 5e8cbb08..3d942a26 100644 --- a/controllers/flinkcluster/flinkcluster_converter_test.go +++ b/controllers/flinkcluster/flinkcluster_converter_test.go @@ -136,198 +136,201 @@ var ( } ) -func getObservedClusterState() *ObservedClusterState { - - return &ObservedClusterState{ - cluster: &v1beta1.FlinkCluster{ - TypeMeta: metav1.TypeMeta{ - Kind: "FlinkCluster", - APIVersion: "flinkoperator.k8s.io/v1beta1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "flinkjobcluster-sample", - Namespace: "default", - }, - Spec: v1beta1.FlinkClusterSpec{ - Image: v1beta1.ImageSpec{Name: "flink:1.8.1"}, - ServiceAccountName: &serviceAccount, - Job: &v1beta1.JobSpec{ - Args: []string{"--input", "./README.txt"}, - ClassName: &className, - JarFile: &jarFile, - Parallelism: ¶llelism, - Resources: corev1.ResourceRequirements{ - Requests: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceCPU: resource.MustParse("100m"), - corev1.ResourceMemory: resource.MustParse("256Mi"), - }, - Limits: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceCPU: resource.MustParse("200m"), - corev1.ResourceMemory: resource.MustParse("512Mi"), - }, - }, - Mode: &jobMode, - RestartPolicy: &restartPolicy, - Volumes: []corev1.Volume{ - { - Name: "cache-volume", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }, - }, +func getDummyFlinkCluster() *v1beta1.FlinkCluster { + return &v1beta1.FlinkCluster{ + TypeMeta: metav1.TypeMeta{ + Kind: "FlinkCluster", + APIVersion: "flinkoperator.k8s.io/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "flinkjobcluster-sample", + Namespace: "default", + }, + Spec: v1beta1.FlinkClusterSpec{ + Image: v1beta1.ImageSpec{Name: "flink:1.8.1"}, + ServiceAccountName: &serviceAccount, + Job: &v1beta1.JobSpec{ + Args: []string{"--input", "./README.txt"}, + ClassName: &className, + JarFile: &jarFile, + Parallelism: ¶llelism, + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), }, - VolumeMounts: []corev1.VolumeMount{ - {Name: "cache-volume", MountPath: "/cache"}, + Limits: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("200m"), + corev1.ResourceMemory: resource.MustParse("512Mi"), }, - InitContainers: []corev1.Container{ - { - Name: "gcs-downloader", - Image: "google/cloud-sdk", - Command: []string{"gsutil"}, - Args: []string{ - "cp", "gs://my-bucket/my-job.jar", "/cache/my-job.jar", - }, + }, + Mode: &jobMode, + RestartPolicy: &restartPolicy, + Volumes: []corev1.Volume{ + { + Name: "cache-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, - PodAnnotations: map[string]string{ - "example.com": "example", - }, - SecurityContext: &securityContext, - HostAliases: hostAliases, }, - JobManager: &v1beta1.JobManagerSpec{ - AccessScope: v1beta1.AccessScopeVPC, - Ingress: &v1beta1.JobManagerIngressSpec{ - HostFormat: &hostFormat, - Annotations: map[string]string{ - "kubernetes.io/ingress.class": "nginx", - "certmanager.k8s.io/cluster-issuer": "letsencrypt-stg", - "nginx.ingress.kubernetes.io/rewrite-target": "/", + VolumeMounts: []corev1.VolumeMount{ + {Name: "cache-volume", MountPath: "/cache"}, + }, + InitContainers: []corev1.Container{ + { + Name: "gcs-downloader", + Image: "google/cloud-sdk", + Command: []string{"gsutil"}, + Args: []string{ + "cp", "gs://my-bucket/my-job.jar", "/cache/my-job.jar", }, - UseTLS: &useTLS, }, - Ports: v1beta1.JobManagerPorts{ - RPC: &jmRPCPort, - Blob: &jmBlobPort, - Query: &jmQueryPort, - UI: &jmUIPort, + }, + PodAnnotations: map[string]string{ + "example.com": "example", + }, + SecurityContext: &securityContext, + HostAliases: hostAliases, + }, + JobManager: &v1beta1.JobManagerSpec{ + AccessScope: v1beta1.AccessScopeVPC, + Ingress: &v1beta1.JobManagerIngressSpec{ + HostFormat: &hostFormat, + Annotations: map[string]string{ + "kubernetes.io/ingress.class": "nginx", + "certmanager.k8s.io/cluster-issuer": "letsencrypt-stg", + "nginx.ingress.kubernetes.io/rewrite-target": "/", }, - LivenessProbe: &jmLivenessProbe, - ReadinessProbe: &jmReadinessProbe, - Resources: corev1.ResourceRequirements{ - Requests: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceCPU: resource.MustParse("100m"), - corev1.ResourceMemory: resource.MustParse("256Mi"), - }, - Limits: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceCPU: resource.MustParse("200m"), - corev1.ResourceMemory: resource.MustParse("512Mi"), - }, + UseTLS: &useTLS, + }, + Ports: v1beta1.JobManagerPorts{ + RPC: &jmRPCPort, + Blob: &jmBlobPort, + Query: &jmQueryPort, + UI: &jmUIPort, + }, + LivenessProbe: &jmLivenessProbe, + ReadinessProbe: &jmReadinessProbe, + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), }, - Tolerations: tolerations, - HostAliases: hostAliases, - MemoryOffHeapRatio: &memoryOffHeapRatio, - MemoryOffHeapMin: memoryOffHeapMin, - MemoryProcessRatio: &memoryProcessRatio, - PodAnnotations: map[string]string{ - "example.com": "example", + Limits: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("200m"), + corev1.ResourceMemory: resource.MustParse("512Mi"), }, - SecurityContext: &securityContext, }, - TaskManager: &v1beta1.TaskManagerSpec{ - DeploymentType: v1beta1.DeploymentTypeStatefulSet, - Replicas: &replicas, - Ports: v1beta1.TaskManagerPorts{ - Data: &tmDataPort, - RPC: &tmRPCPort, - Query: &tmQueryPort, + Tolerations: tolerations, + HostAliases: hostAliases, + MemoryOffHeapRatio: &memoryOffHeapRatio, + MemoryOffHeapMin: memoryOffHeapMin, + MemoryProcessRatio: &memoryProcessRatio, + PodAnnotations: map[string]string{ + "example.com": "example", + }, + SecurityContext: &securityContext, + }, + TaskManager: &v1beta1.TaskManagerSpec{ + DeploymentType: v1beta1.DeploymentTypeStatefulSet, + Replicas: &replicas, + Ports: v1beta1.TaskManagerPorts{ + Data: &tmDataPort, + RPC: &tmRPCPort, + Query: &tmQueryPort, + }, + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("200m"), + corev1.ResourceMemory: resource.MustParse("512Mi"), }, - Resources: corev1.ResourceRequirements{ - Requests: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceCPU: resource.MustParse("200m"), - corev1.ResourceMemory: resource.MustParse("512Mi"), - }, - Limits: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceCPU: resource.MustParse("500m"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, + Limits: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), }, - LivenessProbe: &tmLivenessProbe, - ReadinessProbe: &tmReadinessProbe, - MemoryOffHeapRatio: &memoryOffHeapRatio, - MemoryOffHeapMin: memoryOffHeapMin, - MemoryProcessRatio: &memoryProcessRatio, - Sidecars: []corev1.Container{{Name: "sidecar", Image: "alpine"}}, - Volumes: []corev1.Volume{ - { - Name: "cache-volume", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }, + }, + LivenessProbe: &tmLivenessProbe, + ReadinessProbe: &tmReadinessProbe, + MemoryOffHeapRatio: &memoryOffHeapRatio, + MemoryOffHeapMin: memoryOffHeapMin, + MemoryProcessRatio: &memoryProcessRatio, + Sidecars: []corev1.Container{{Name: "sidecar", Image: "alpine"}}, + Volumes: []corev1.Volume{ + { + Name: "cache-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, - VolumeMounts: []corev1.VolumeMount{ - {Name: "cache-volume", MountPath: "/cache"}, - }, - VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "pvc-test", - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "flinkoperator.k8s.io/v1beta1", - Kind: "FlinkCluster", - Name: "flinkjobcluster-sample", - Controller: &controller, - BlockOwnerDeletion: &blockOwnerDeletion, - }, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "cache-volume", MountPath: "/cache"}, + }, + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc-test", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "flinkoperator.k8s.io/v1beta1", + Kind: "FlinkCluster", + Name: "flinkjobcluster-sample", + Controller: &controller, + BlockOwnerDeletion: &blockOwnerDeletion, }, }, - Spec: corev1.PersistentVolumeClaimSpec{ - AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, - Resources: corev1.ResourceRequirements{ - Requests: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceStorage: resource.MustParse("100Gi"), - }, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceStorage: resource.MustParse("100Gi"), }, - StorageClassName: &storageClassName, }, + StorageClassName: &storageClassName, }, }, - Tolerations: tolerations, - HostAliases: hostAliases, - PodAnnotations: map[string]string{ - "example.com": "example", - }, - SecurityContext: &securityContext, - }, - EnvVars: []corev1.EnvVar{{Name: "FOO", Value: "abc"}}, - EnvFrom: []corev1.EnvFromSource{{ConfigMapRef: &corev1.ConfigMapEnvSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: "FOOMAP", - }}}}, - HadoopConfig: &v1beta1.HadoopConfig{ - ConfigMapName: "hadoop-configmap", - MountPath: "/etc/hadoop/conf", }, - GCPConfig: &v1beta1.GCPConfig{ - ServiceAccount: &v1beta1.GCPServiceAccount{ - SecretName: "gcp-service-account-secret", - KeyFile: "gcp_service_account_key.json", - MountPath: "/etc/gcp_service_account/", - }, + Tolerations: tolerations, + HostAliases: hostAliases, + PodAnnotations: map[string]string{ + "example.com": "example", }, - LogConfig: map[string]string{ - "extra-file.txt": "hello!", - "log4j-console.properties": "foo", - "logback-console.xml": "bar", - "log4j-cli.properties": "baz", + SecurityContext: &securityContext, + }, + EnvVars: []corev1.EnvVar{{Name: "FOO", Value: "abc"}}, + EnvFrom: []corev1.EnvFromSource{{ConfigMapRef: &corev1.ConfigMapEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "FOOMAP", + }}}}, + HadoopConfig: &v1beta1.HadoopConfig{ + ConfigMapName: "hadoop-configmap", + MountPath: "/etc/hadoop/conf", + }, + GCPConfig: &v1beta1.GCPConfig{ + ServiceAccount: &v1beta1.GCPServiceAccount{ + SecretName: "gcp-service-account-secret", + KeyFile: "gcp_service_account_key.json", + MountPath: "/etc/gcp_service_account/", }, }, - Status: v1beta1.FlinkClusterStatus{ - Revision: v1beta1.RevisionStatus{NextRevision: "flinkjobcluster-sample-85dc8f749-1"}, + LogConfig: map[string]string{ + "extra-file.txt": "hello!", + "log4j-console.properties": "foo", + "logback-console.xml": "bar", + "log4j-cli.properties": "baz", }, }, + Status: v1beta1.FlinkClusterStatus{ + Revision: v1beta1.RevisionStatus{NextRevision: "flinkjobcluster-sample-85dc8f749-1"}, + }, + } +} + +func getObservedClusterState() *ObservedClusterState { + return &ObservedClusterState{ + cluster: getDummyFlinkCluster(), } } diff --git a/controllers/flinkcluster/suite_test.go b/controllers/flinkcluster/suite_test.go index 56757523..a3f05f55 100644 --- a/controllers/flinkcluster/suite_test.go +++ b/controllers/flinkcluster/suite_test.go @@ -18,11 +18,13 @@ package flinkcluster import ( "context" + "path/filepath" + "testing" + "time" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "path/filepath" ctrl "sigs.k8s.io/controller-runtime" - "testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -98,7 +100,11 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { cancel() + ctx.Done() By("tearing down the test environment") - err := testEnv.Stop() - Expect(err).ToNot(HaveOccurred()) + timeout := 30 * time.Second + poll := 5 * time.Second + Eventually(func() error { + return testEnv.Stop() + }, timeout, poll).ShouldNot(HaveOccurred()) })