diff --git a/.gitignore b/.gitignore index 0d445c4291b..7524fe516ea 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,8 @@ cmd/*/start.sh openapi-gen sdk **/__debug_bin + +# debug related +**/__debug_bin +**/__pycache__ + diff --git a/test/prow/access_oss_via_jindorumtime.py b/test/prow/access_oss_via_jindorumtime.py deleted file mode 100644 index 04c76f361e3..00000000000 --- a/test/prow/access_oss_via_jindorumtime.py +++ /dev/null @@ -1,261 +0,0 @@ -""" -TestCase: Access OSS via JindoRuntime -DDC Engine: JindoFS -Prerequisites: -1. Fluid need to enable JindoRuntime. -2. There is a Secret named 'e2e-test-jindoruntime-secret' -Steps: -1. Create Dataset and JindoRuntime -2. Check if dataset is bound -3. Check if PersistentVolumeClaim & PV is created -4. Submit data read job -5. Wait until data read job completes -6. Clean up -""" -from kubernetes import client, config -import time - -namespace = 'default' -dataset_name = 'oss-bucket' -runtime_name = 'oss-bucket' -secret_name = 'e2e-test-jindoruntime-secret' -oss_bucket_name = 'fluid-e2e' -oss_bucket_endpoint = 'oss-cn-hongkong-internal.aliyuncs.com' - -def create_dataset_and_jindo_runtime(): - api = client.CustomObjectsApi() - dataset = { - 'apiVersion': 'data.fluid.io/v1alpha1', - 'kind': 'Dataset', - 'metadata': { - 'name': dataset_name - }, - 'spec': { - 'mounts': [ - { - 'mountPoint': 'oss://' + oss_bucket_name, - 'options': { - 'fs.oss.endpoint': oss_bucket_endpoint - }, - 'name': 'ossbucket', - 'encryptOptions': [ - { - 'name': 'fs.oss.accessKeyId', - 'valueFrom': { - 'secretKeyRef': { - 'name': secret_name, - 'key': 'fs.oss.accessKeyId' - } - } - }, - { - 'name': 'fs.oss.accessKeySecret', - 'valueFrom': { - 'secretKeyRef': { - 'name': secret_name, - 'key': 'fs.oss.accessKeySecret' - } - } - } - ] - } - ] - } - } - - jindo_runtime = { - 'apiVersion': 'data.fluid.io/v1alpha1', - 'kind': 'JindoRuntime', - 'metadata': { - 'name': runtime_name - }, - 'spec': { - 'replicas': 1, - 'tieredstore': { - 'levels': [ - { - 'mediumtype': 'MEM', - 'path': '/dev/shm', - 'quota': '10G', - 'high': '0.99', - 'low': '0.98' - } - ] - } - } - } - - api.create_namespaced_custom_object( - group = 'data.fluid.io', - version = 'v1alpha1', - namespace = namespace, - plural = 'datasets', - body = dataset - ) - - print("Dataset created.") - - api.create_namespaced_custom_object( - group = 'data.fluid.io', - version = 'v1alpha1', - namespace = namespace, - plural = 'jindoruntimes', - body = jindo_runtime - ) - - print("JindoRuntime created.") - - return - -def check_dataset_is_bound(): - api = client.CustomObjectsApi() - while True: - resource = api.get_namespaced_custom_object( - group = 'data.fluid.io', - version = 'v1alpha1', - name = dataset_name, - namespace = namespace, - plural = 'datasets' - ) - print(resource) - if 'status' in resource: - if 'phase' in resource['status']: - if resource['status']['phase'] == 'Bound': - print('Dataset status is bound.') - break - time.sleep(1) - return - -def check_pvc_and_pv_is_created(): - while True: - try: - client.CoreV1Api().read_persistent_volume(name = namespace + '-' + dataset_name) - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - - try: - client.CoreV1Api().read_namespaced_persistent_volume_claim(name = dataset_name, namespace = namespace) - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - - print('PersistentVolume & PersistentVolumeClaim Ready.') - break - return - -def submit_data_read_job(): - api = client.BatchV1Api() - - container = client.V1Container( - name = 'busybox', - image = 'busybox', - command = ['/bin/sh'], - args = ['-c', 'set -x; time cp -r /data/ossbucket ./'], - volume_mounts = [ - client.V1VolumeMount(mount_path = '/data', name = 'oss-bucket-vol') - ] - ) - - template = client.V1PodTemplateSpec( - metadata = client.V1ObjectMeta(labels = {'app': 'dataread'}), - spec = client.V1PodSpec( - restart_policy = 'Never', - containers = [container], - volumes = [ - client.V1Volume( - name = 'oss-bucket-vol', - persistent_volume_claim = client.V1PersistentVolumeClaimVolumeSource(claim_name = dataset_name) - ) - ] - ) - ) - - spec = client.V1JobSpec( - template = template, - backoff_limit = 4 - ) - - job = client.V1Job( - api_version = 'batch/v1', - kind = 'Job', - metadata = client.V1ObjectMeta(name = 'fluid-copy-test'), - spec = spec - ) - - api.create_namespaced_job(namespace = namespace, body = job) - print('Job created.') - return - -def check_data_read_job_status(): - api = client.BatchV1Api() - - job_completed = False - while not job_completed: - response = api.read_namespaced_job_status( - name = 'fluid-copy-test', - namespace = namespace - ) - - if response.status.succeeded is not None or \ - response.status.failed is not None: - job_completed = True - - time.sleep(1) - - print('Data Read Job done.') - return - -def clean_up(): - batch_api = client.BatchV1Api() - - # Delete Data Read Job - - # See https://github.com/kubernetes-client/python/issues/234 - body = client.V1DeleteOptions(propagation_policy = 'Background') - batch_api.delete_namespaced_job(name = 'fluid-copy-test', namespace = namespace, body = body) - - - custom_api = client.CustomObjectsApi() - - custom_api.delete_namespaced_custom_object( - group = 'data.fluid.io', - version = 'v1alpha1', - name = dataset_name, - namespace = namespace, - plural = 'datasets' - ) - - runtimeDelete = False - while not runtimeDelete: - print('Runtime still exists...') - try: - runtime = custom_api.get_namespaced_custom_object( - group = 'data.fluid.io', - version = 'v1alpha1', - name = runtime_name, - namespace = namespace, - plural = 'jindoruntimes' - ) - except client.exceptions.ApiException as e: - if e.status == 404: - runtimeDelete = True - continue - - time.sleep(1) - - return - -def init_k8s_client(): - config.load_incluster_config() - -if __name__ == '__main__': - init_k8s_client() - create_dataset_and_jindo_runtime() - check_dataset_is_bound() - check_pvc_and_pv_is_created() - submit_data_read_job() - check_data_read_job_status() - clean_up() diff --git a/test/prow/access_web_ufs.py b/test/prow/access_web_ufs.py deleted file mode 100644 index 56717c76c23..00000000000 --- a/test/prow/access_web_ufs.py +++ /dev/null @@ -1,235 +0,0 @@ -""" -TestCase: Access WebUFS data -DDC Engine: Alluxio -Steps: -1. create Dataset(WebUFS) & Runtime -2. check if dataset is bound -3. check if persistentVolumeClaim & PV is created -4. submit data read job -5. wait until data read job completes -6. clean up -""" - -import time - -from kubernetes import client, config - - -def createDatasetAndRuntime(): - api = client.CustomObjectsApi() - my_dataset = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "Dataset", - "metadata": {"name": "hbase"}, - "spec": { - "mounts": [{"mountPoint": "https://mirrors.bit.edu.cn/apache/zookeeper/stable/", "name": "hbase"}] - } - } - - my_alluxioruntime = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "AlluxioRuntime", - "metadata": {"name": "hbase"}, - "spec": { - "replicas": 1, - "podMetadata": { - "labels": { - "foo": "bar" - } - }, - "master": { - "podMetadata": { - "labels": { - "foo": "bar2", - "test1": "master-value", - } - } - }, - "worker": { - "podMetadata": { - "labels": { - "foo": "bar2", - "test1": "worker-value", - } - } - }, - "tieredstore": { - "levels": [{ - "mediumtype": "MEM", - "path": "/dev/shm", - "quota": "2Gi", - "high": "0.95", - "low": "0.7" - }] - } - } - } - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="datasets", - body=my_dataset, - ) - - print("Created dataset.") - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="alluxioruntimes", - body=my_alluxioruntime - ) - - print("Created alluxioruntime.") - - -def checkDatasetBound(): - api = client.CustomObjectsApi() - - while True: - resource = api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace="default", - plural="datasets" - ) - - print(resource) - - if "status" in resource: - if "phase" in resource["status"]: - if resource["status"]["phase"] == "Bound": - break - time.sleep(1) - print(resource) - - -def checkVolumeResourcesReady(): - while True: - try: - client.CoreV1Api().read_persistent_volume(name="default-hbase") - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - - try: - client.CoreV1Api().read_namespaced_persistent_volume_claim(name="hbase", namespace="default") - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - - print("PersistentVolume & PersistentVolumeClaim Ready.") - break - - -def createDataReadJob(): - api = client.BatchV1Api() - - container = client.V1Container( - name="busybox", - image="busybox", - command=["/bin/sh"], - args=["-c", "set -x; time cp -r /data/hbase ./"], - volume_mounts=[client.V1VolumeMount(mount_path="/data", name="hbase-vol")] - ) - - template = client.V1PodTemplateSpec( - metadata=client.V1ObjectMeta(labels={"app": "dataread"}), - spec=client.V1PodSpec(restart_policy="Never", containers=[container], volumes=[client.V1Volume(name="hbase-vol", - persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( - claim_name="hbase"))]) - ) - - spec = client.V1JobSpec( - template=template, - backoff_limit=4 - ) - - job = client.V1Job( - api_version="batch/v1", - kind="Job", - metadata=client.V1ObjectMeta(name="fluid-copy-test"), - spec=spec - ) - - api.create_namespaced_job(namespace="default", body=job) - print("Job created.") - - -def checkDataReadJobStatus(): - api = client.BatchV1Api() - - job_completed = False - while not job_completed: - response = api.read_namespaced_job_status( - name="fluid-copy-test", - namespace="default" - ) - - if response.status.succeeded is not None or \ - response.status.failed is not None: - job_completed = True - - time.sleep(1) - - print("Data Read Job done.") - -def cleanUp(): - batch_api = client.BatchV1Api() - - # Delete Data Read Job - - # See https://github.com/kubernetes-client/python/issues/234 - body = client.V1DeleteOptions(propagation_policy='Background') - batch_api.delete_namespaced_job(name="fluid-copy-test", namespace="default", body=body) - - custom_api = client.CustomObjectsApi() - - custom_api.delete_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace="default", - plural="datasets" - ) - - runtimeDelete = False - while not runtimeDelete: - print("runtime still exists...") - try: - runtime = custom_api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace="default", - plural="alluxioruntimes" - ) - except client.exceptions.ApiException as e: - if e.status == 404: - runtimeDelete = True - continue - - time.sleep(1) - - -def main(): - # config.load_kube_config() - config.load_incluster_config() - - createDatasetAndRuntime() - checkDatasetBound() - checkVolumeResourcesReady() - createDataReadJob() - checkDataReadJobStatus() - cleanUp() - - -if __name__ == '__main__': - main() - diff --git a/test/prow/alluxio_dynamic_mountpoint.py b/test/prow/alluxio_dynamic_mountpoint.py deleted file mode 100644 index 3085dc543a8..00000000000 --- a/test/prow/alluxio_dynamic_mountpoint.py +++ /dev/null @@ -1,293 +0,0 @@ -""" -TestCase: Alluxio dynamic changes mountpoints -DDC Engine: Alluxio -Steps: -1. create Dataset(WebUFS) & Runtime with two mountpoint -2. check if dataset is bound -3. check if persistentVolumeClaim & PV is created -4. check alluxioruntime mountpoint and data -5. change dataset mountpoint and update -6. check dataset is bound and mountpoint change -7. check if alluxio master recover after crash -8. clean up -""" - -import time - -from kubernetes import client, config -from kubernetes.stream import stream - - -def createDatasetAndRuntime(): - api = client.CustomObjectsApi() - my_dataset = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "Dataset", - "metadata": { - "name": "hbase" - }, - "spec": { - "mounts": [ - { - "mountPoint": "https://mirrors.bit.edu.cn/apache/hbase/stable/", - "name": "hbase" - }, - { - "mountPoint": "https://mirrors.bit.edu.cn/apache/hadoop/common/stable/", - "name": "hadoop" - } - ] - } - } - - my_alluxioruntime = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "AlluxioRuntime", - "metadata": { - "name": "hbase" - }, - "spec": { - "replicas": 2, - "tieredstore": { - "levels": [ - { - "mediumtype": "MEM", - "path": "/dev/shm", - "quota": "2Gi", - "high": "0.95", - "low": "0.7" - } - ] - } - } - } - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="datasets", - body=my_dataset, - ) - - print("Created dataset.") - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="alluxioruntimes", - body=my_alluxioruntime - ) - - print("Created alluxioruntime.") - - -def checkDatasetBound(): - api = client.CustomObjectsApi() - - while True: - resource = api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace="default", - plural="datasets" - ) - - print(resource) - - if "status" in resource: - if "phase" in resource["status"]: - if resource["status"]["phase"] == "Bound": - break - time.sleep(1) - - -def checkVolumeResourcesReady(): - while True: - try: - client.CoreV1Api().read_persistent_volume(name="default-hbase") - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - - try: - client.CoreV1Api().read_namespaced_persistent_volume_claim( - name="hbase", namespace="default") - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - - print("PersistentVolume & PersistentVolumeClaim Ready.") - break - - -def checkAlluxioruntimeMountpoint(dataset1, dataset2): - exec_command = ["/bin/sh", - "-c", - "alluxio fs mount"] - resp = stream( - client.CoreV1Api().connect_get_namespaced_pod_exec, "hbase-master-0", "default", - command=exec_command, stderr=True, stdin=False, - stdout=True, tty=False, container='alluxio-master') - print("Response: " + resp) - if dataset1 not in resp or dataset2 not in resp: - print("checkAlluxioruntimeMountpoint Failed") - return 1 - - -def changeDatasetMountpoint(): - new_dataset = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "Dataset", - "metadata": { - "name": "hbase" - }, - "spec": { - "mounts": [ - { - "mountPoint": "https://mirrors.bit.edu.cn/apache/hbase/stable/", - "name": "hbase" - }, - { - "mountPoint": "https://mirrors.bit.edu.cn/apache/zookeeper/stable/", - "name": "zookeeper" - } - ] - } - } - - new_alluxioruntime = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "AlluxioRuntime", - "metadata": { - "name": "hbase" - }, - "spec": { - "replicas": 2, - "tieredstore": { - "levels": [ - { - "mediumtype": "MEM", - "path": "/dev/shm", - "quota": "2Gi", - "high": "0.95", - "low": "0.7" - } - ] - } - } - } - - client.CustomObjectsApi().patch_namespaced_custom_object( - name="hbase", - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="datasets", - body=new_dataset, - ) - - client.CustomObjectsApi().patch_namespaced_custom_object( - name="hbase", - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="alluxioruntimes", - body=new_alluxioruntime, - ) - - time.sleep(1) - while True: - resource = client.CustomObjectsApi().get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace="default", - plural="datasets" - ) - - print(resource) - - if "status" in resource: - if "mounts" in resource["status"]: - print(resource["status"]["mounts"]) - if resource["status"]["mounts"][0]["name"] == "zookeeper" or resource["status"]["mounts"][1]["name"] == "zookeeper": - break - - time.sleep(1) - - -def checkRecoverAfterCrash(): - # exec the master pod and kill - exec_command = ["/bin/sh", - "-c", - "kill 1"] - resp = stream( - client.CoreV1Api().connect_get_namespaced_pod_exec, "hbase-master-0", "default", - command=exec_command, stderr=True, stdin=False, - stdout=True, tty=False, container='alluxio-master') - print("Response: " + resp) - - api = client.CoreV1Api() - time.sleep(1) - response = api.read_namespaced_pod( - name="hbase-master-0", namespace="default") - while response.status.phase != "Running": - time.sleep(1) - response = api.read_namespaced_pod( - name="hbase-master-0", namespace="default") - print(response) - - -def cleanUp(): - custom_api = client.CustomObjectsApi() - - custom_api.delete_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace="default", - plural="datasets" - ) - - runtimeDelete = False - while not runtimeDelete: - print("runtime still exists...") - try: - runtime = custom_api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace="default", - plural="alluxioruntimes" - ) - except client.exceptions.ApiException as e: - if e.status == 404: - runtimeDelete = True - continue - - time.sleep(1) - - -def main(): - # config.load_kube_config() - config.load_incluster_config() - - createDatasetAndRuntime() - checkDatasetBound() - checkVolumeResourcesReady() - res_check_mountpoint0 = checkAlluxioruntimeMountpoint("hbase", "hadoop") - changeDatasetMountpoint() - res_check_mountpoint1 = checkAlluxioruntimeMountpoint("hbase", "zookeeper") - checkRecoverAfterCrash() - cleanUp() - if res_check_mountpoint0 == 1 or res_check_mountpoint1 == 1: - exit(-1) - - -if __name__ == "__main__": - main() diff --git a/test/prow/alluxioruntime_resources_setting.py b/test/prow/alluxioruntime_resources_setting.py deleted file mode 100644 index 5403515f09f..00000000000 --- a/test/prow/alluxioruntime_resources_setting.py +++ /dev/null @@ -1,311 +0,0 @@ -""" -TestCase: Resources setting for alluxio runtime -DDC Engine: Alluxio -Steps: -1. create Dataset(WebUFS) & Runtime with specified resource -2. check if dataset is bound -3. check if persistentVolumeClaim & PV is created -4. submit data read job -5. check if alluxio runtime resources are consistent with expected -6. clean up -""" - -import time - -from kubernetes import client, config - - -def createDatasetAndRuntime(): - api = client.CustomObjectsApi() - my_dataset = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "Dataset", - "metadata": {"name": "hbase"}, - "spec": { - "mounts": [{"mountPoint": "https://mirrors.bit.edu.cn/apache/hbase/stable/", "name": "hbase"}] - } - } - my_alluxioruntime = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "AlluxioRuntime", - "metadata": { - "name": "hbase" - }, - "spec": { - "replicas": 1, - "tieredstore": { - "levels": [ - { - "mediumtype": "MEM", - "path": "/dev/shm", - "quota": "2Gi", - "high": "0.95", - "low": "0.7" - } - ] - }, - "master": { - "resources": { - "requests": { - "cpu": "1000m", - "memory": "4Gi" - }, - "limits": { - "cpu": "2000m", - "memory": "8Gi" - } - } - }, - "jobMaster": { - "resources": { - "requests": { - "cpu": "1500m", - "memory": "4Gi" - }, - "limits": { - "cpu": "2000m", - "memory": "8Gi" - } - } - }, - "worker": { - "resources": { - "requests": { - "cpu": "1000m", - "memory": "4Gi" - }, - "limits": { - "cpu": "2000m", - "memory": "8Gi" - } - } - }, - "jobWorker": { - "resources": { - "requests": { - "cpu": "1000m", - "memory": "4Gi" - }, - "limits": { - "cpu": "2000m", - "memory": "8Gi" - } - } - }, - "fuse": { - "resources": { - "requests": { - "cpu": "1000m", - "memory": "4Gi" - }, - "limits": { - "cpu": "2000m", - "memory": "8Gi" - } - } - } - } - } - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="datasets", - body=my_dataset, - ) - - print("Created dataset.") - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="alluxioruntimes", - body=my_alluxioruntime - ) - - print("Created alluxioruntime.") - - -def checkDatasetBound(): - api = client.CustomObjectsApi() - - while True: - resource = api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace="default", - plural="datasets" - ) - - print(resource) - - if "status" in resource: - if "phase" in resource["status"]: - if resource["status"]["phase"] == "Bound": - break - time.sleep(1) - print(resource) - - -def checkVolumeResourcesReady(): - while True: - try: - client.CoreV1Api().read_persistent_volume(name="default-hbase") - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - - try: - client.CoreV1Api().read_namespaced_persistent_volume_claim( - name="hbase", namespace="default") - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - - print("PersistentVolume & PersistentVolumeClaim Ready.") - break - - -def createDataReadJob(): - api = client.BatchV1Api() - - container = client.V1Container( - name="busybox", - image="busybox", - command=["/bin/sh"], - volume_mounts=[client.V1VolumeMount( - mount_path="/data", name="hbase-vol")] - ) - - template = client.V1PodTemplateSpec( - metadata=client.V1ObjectMeta(labels={"app": "dataread"}), - spec=client.V1PodSpec(restart_policy="Never", containers=[container], volumes=[client.V1Volume(name="hbase-vol", - persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( - claim_name="hbase"))]) - ) - - spec = client.V1JobSpec( - template=template, - backoff_limit=4 - ) - - job = client.V1Job( - api_version="batch/v1", - kind="Job", - metadata=client.V1ObjectMeta(name="fluid-copy-test"), - spec=spec - ) - - api.create_namespaced_job(namespace="default", body=job) - print("Job created.") - - -def checkAlluxioruntimeResource(): - api = client.CoreV1Api() - - response = api.read_namespaced_pod(name="hbase-master-0", namespace="default") - master_resource_check = True - for container in response.spec.containers: - if container.name == "master": # master - if container.resources.limits["cpu"] == "2" and container.resources.limits["memory"] == "8Gi" and \ - container.resources.requests["cpu"] == "1" and container.resources.requests["memory"] == "4Gi": - continue - else: - master_resource_check = False - elif container.name == "jobmaster": # jobmaster - if container.resources.limits["cpu"] == "2" and container.resources.limits["memory"] == "8Gi" and \ - container.resources.requests["cpu"] == "1500m" and container.resources.requests["memory"] == "4Gi": - continue - else: - master_resource_check = False - - if master_resource_check: - print("Master Resource Check Pass") - - response = api.read_namespaced_pod(name="hbase-worker-0", namespace="default") - worker_resource_check = True - for container in response.spec.containers: - if container.resources.limits["cpu"] == "2" and container.resources.limits["memory"] == "8Gi" and \ - container.resources.requests["cpu"] == "1" and container.resources.requests["memory"] == "4Gi": - continue - else: - worker_resource_check = False - - if worker_resource_check: - print("Worker Resource Check Pass") - - # pod_list = api.list_namespaced_pod(namespace="default") - # fuse_resource_check = True - # for pod in pod_list.items: - # if "fuse" in pod.metadata.name: - # for container in pod.spec.containers: - # if container.resources.limits["cpu"] == "2" and container.resources.limits["memory"] == "8Gi" and \ - # container.resources.requests["cpu"] == "1" and container.resources.requests["memory"] == "4Gi": - # continue - # else: - # fuse_resource_check = False - # if fuse_resource_check: - # print("Fuse Resource Check Pass") - - if not master_resource_check & worker_resource_check: - return 1 - -def cleanUp(): - batch_api = client.BatchV1Api() - - # Delete Data Read Job - - # See https://github.com/kubernetes-client/python/issues/234 - body = client.V1DeleteOptions(propagation_policy='Background') - batch_api.delete_namespaced_job( - name="fluid-copy-test", namespace="default", body=body) - - custom_api = client.CustomObjectsApi() - - custom_api.delete_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace="default", - plural="datasets" - ) - - runtimeDelete = False - while not runtimeDelete: - print("runtime still exists...") - try: - runtime = custom_api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace="default", - plural="alluxioruntimes" - ) - except client.exceptions.ApiException as e: - if e.status == 404: - runtimeDelete = True - continue - - time.sleep(1) - - -def main(): - # config.load_kube_config() - config.load_incluster_config() - - createDatasetAndRuntime() - checkDatasetBound() - checkVolumeResourcesReady() - createDataReadJob() - res = checkAlluxioruntimeResource() - cleanUp() - if res == 1: - exit(-1) - - -if __name__ == '__main__': - main() diff --git a/test/prow/csi_stable_patch.py b/test/prow/csi_stable_patch.py deleted file mode 100644 index 99cc192cdcc..00000000000 --- a/test/prow/csi_stable_patch.py +++ /dev/null @@ -1,215 +0,0 @@ -""" -TestCase: CSI Plugin Stale Node Patch Verification -DDC Engine: Alluxio -Steps: -1. create Dataset & Runtime -2. check if dataset is bound -3. create app pod -4. check app pod is running -5. add node label -6. delete app pod -7. create app pod again -8. check app pod is running -9. check added label exist -10. clean up -""" -import time -from kubernetes import client, config - -NS = "default" -def createDatasetAndRuntime(): - api = client.CustomObjectsApi() - my_dataset = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "Dataset", - "metadata": {"name": "hbase", "namespace": NS}, - "spec": { - "mounts": [{"mountPoint": "https://mirrors.bit.edu.cn/apache/spark/", - "name": "hbase"}] - } - } - - my_alluxioruntime = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "AlluxioRuntime", - "metadata": {"name": "hbase", "namespace": NS}, - "spec": { - "replicas": 1, - "tieredstore": { - "levels": [{ - "mediumtype": "MEM", - "path": "/dev/shm", - "quota": "2Gi", - "high": "0.95", - "low": "0.7" - }] - } - } - } - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace=NS, - plural="datasets", - body=my_dataset, - ) - - print("Created dataset.") - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace=NS, - plural="alluxioruntimes", - body=my_alluxioruntime - ) - - print("Created alluxioruntime.") - -def checkDatasetBound(): - api = client.CustomObjectsApi() - - while True: - resource = api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace=NS, - plural="datasets" - ) - - if "status" in resource: - if "phase" in resource["status"]: - if resource["status"]["phase"] == "Bound": - break - print("Not bound.") - time.sleep(1) - # print(resource) - - -def createApp(): - api = client.CoreV1Api() - my_app = { - "apiVersion": "v1", - "kind": "Pod", - "metadata": {"name": "nginx", "namespace": NS}, - "spec": { - "containers": [ - { - "name": "nginx", - "image": "nginx", - "volumeMounts": [{"mountPath": "/data", "name": "hbase-vol"}] - } - ], - "volumes": [{ - "name": "hbase-vol", - "persistentVolumeClaim": { - "claimName": "hbase" - } - }] - } - } - api.create_namespaced_pod(NS, my_app) - print("Create pod.") - -def checkAppRun(): - api = client.CoreV1Api() - while True: - resource = api.read_namespaced_pod("nginx", NS) - if (resource.status.phase == "Running"): - print("App running.") - print(resource.spec) - return resource.spec.node_name - print("App pod is not running.") - time.sleep(1) - -def addLabel(node_name): - api = client.CoreV1Api() - resource = api.read_node(node_name) - resource.metadata.labels['test-stale'] = 'true' - api.patch_node(node_name, resource) - print("Add node label.") - -def deleteApp(): - api = client.CoreV1Api() - api.delete_namespaced_pod("nginx", NS) - while True: - try: - resource = api.read_namespaced_pod("nginx", NS) - print("App pod still exists...") - time.sleep(1) - except client.exceptions.ApiException as e: - if e.status == 404: - print("Delete pod.") - return - -def checkLabel(node_name): - api = client.CoreV1Api() - resource = api.read_node(node_name) - if (resource.metadata.labels['test-stale'] and resource.metadata.labels['test-stale'] == 'true'): - print("Added label exists.") - return True - else: - print("Added label does not exist.") - return False - -def cleanUp(node_name): - deleteApp() - - custom_api = client.CustomObjectsApi() - custom_api.delete_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace=NS, - plural="datasets" - ) - - runtimeDelete = False - while not runtimeDelete: - print("runtime still exists...") - try: - runtime = custom_api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace=NS, - plural="alluxioruntimes" - ) - except client.exceptions.ApiException as e: - if e.status == 404: - runtimeDelete = True - continue - - time.sleep(1) - - api = client.CoreV1Api() - resource = api.read_node(node_name) - resource.metadata.labels['test-stale'] = None - api.patch_node(node_name, resource) - - print("Delete added label.") - - -def main(): - config.load_incluster_config() - - createDatasetAndRuntime() - checkDatasetBound() - createApp() - node_name = checkAppRun() - addLabel(node_name) - deleteApp() - createApp() - checkAppRun() - res = checkLabel(node_name) - cleanUp(node_name) - print("Has passed? " + str(True)) - if not res: - exit(-1) - return 0 - - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/test/prow/fluid/fluidapi.py b/test/prow/fluid/fluidapi.py new file mode 100644 index 00000000000..3b03aa807d9 --- /dev/null +++ b/test/prow/fluid/fluidapi.py @@ -0,0 +1,260 @@ +API_VERSION = "data.fluid.io/v1alpha1" +webufs = "https://mirrors.bit.edu.cn/apache/zookeeper/stable/" + +oss_bucket = "oss://fluid-e2e" +oss_endpoint = "oss-cn-hongkong-internal.aliyuncs.com" + +minio_svc = "minio" + + +class K8sObject(): + def __init__(self): + self.resource = {} + + def set_kind(self, api_version, kind): + self.resource["apiVersion"] = api_version + self.resource["kind"] = kind + + return self + + def set_namespaced_name(self, namespace, name): + if "metadata" not in self.resource: + self.resource["metadata"] = {} + + self.resource["metadata"]["namespace"] = namespace + self.resource["metadata"]["name"] = name + + return self + + def dump(self): + return self.resource + + +class Mount(): + def __init__(self): + self.str = {} + + def set_mount_info(self, name, mount_point, path=""): + self.str["mountPoint"] = mount_point + self.str["name"] = name + self.str["path"] = path + + def add_options(self, key, value): + if "options" not in self.str: + self.str["options"] = {} + + self.str["options"][key] = value + + def add_encrypt_options(self, key, secretName, secretKey): + if "encryptOptions" not in self.str: + self.str["encryptOptions"] = [] + + self.str["encryptOptions"].append( + { + "name": key, + "valueFrom": { + "secretKeyRef": { + "name": secretName, + "key": secretKey + } + } + } + ) + + def dump(self): + return self.str + + +class Dataset(K8sObject): + def __init__(self, name, namespace="default"): + super().__init__() + self.set_kind("data.fluid.io/v1alpha1", "Dataset") + self.set_namespaced_name(namespace, name) + + def add_mount(self, mount): + if "spec" not in self.resource: + self.resource["spec"] = {} + if "mounts" not in self.resource["spec"]: + self.resource["spec"]["mounts"] = [] + + self.resource["spec"]["mounts"].append(mount) + return self + + def set_placement(self, placement): + if "spec" not in self.resource: + self.resource["spec"] = {} + self.resource["spec"]["placement"] = placement + return self + + def set_access_mode(self, mode): + if "spec" not in self.resource: + self.resource["spec"] = {} + if "accessModes" not in self.resource["spec"]: + self.resource["spec"]["accessModes"] = [] + + self.resource["spec"]["accessModes"].append(mode) + return self + + def set_node_affinity(self, key, value): + if "spec" not in self.resource: + self.resource["spec"] = {} + if "nodeAffinity" not in self.resource["spec"]: + self.resource["spec"]["nodeAffinity"] = { + "required": { + "nodeSelectorTerms": [{ + "matchExpressions": [{ + "key": key, + "operator": "In", + "values": [value] + }] + }] + } + } + + return self + + + +class Runtime(K8sObject): + def __init__(self, kind, name, namespace="default"): + super().__init__() + self.set_kind("data.fluid.io/v1alpha1", kind) + self.set_namespaced_name(namespace, name) + + def set_replicas(self, replica_num): + if "spec" not in self.resource: + self.resource["spec"] = {} + + self.resource["spec"]["replicas"] = replica_num + return self + + def set_tieredstore(self, mediumtype, path, quota="", quota_list="", high="0.99", low="0.99"): + if "spec" not in self.resource: + self.resource["spec"] = {} + + self.resource["spec"]["tieredstore"] = { + "levels": [{ + "mediumtype": mediumtype, + "path": path, + "high": high, + "low": low + }] + } + + if len(quota) != 0: + self.resource["spec"]["tieredstore"]["levels"][0]["quota"] = quota + if len(quota_list) != 0: + self.resource["spec"]["tieredstore"]["levels"][0]["quota"] = quota_list + + return self + + +class DataLoad(K8sObject): + def __init__(self, name, namespace="default"): + super().__init__() + self.set_kind(API_VERSION, "DataLoad") + self.set_namespaced_name(namespace, name) + + def set_target_dataset(self, dataset_name, dataset_namespace="default"): + if "spec" not in self.resource: + self.resource["spec"] = {} + + if "dataset" not in self.resource["spec"]: + self.resource["spec"]["dataset"] = {} + + self.resource["spec"]["dataset"]["name"] = dataset_name + self.resource["spec"]["dataset"]["namespace"] = dataset_namespace + + return self + + def set_load_metadata(self, should_load): + if "spec" not in self.resource: + self.resource["spec"] = {} + + self.resource["spec"]["loadMetadata"] = should_load + + return self + + + +def assemble_dataset(testcase): + if testcase == "alluxio-webufs": + mount = Mount() + mount.set_mount_info("zookeeper", webufs) + + dataset = Dataset("hbase") + dataset.add_mount(mount.dump()) + + return dataset + + elif testcase == "alluxio-oss": + mount = Mount() + mount.set_mount_info("demo", oss_bucket) + mount.add_options("fs.oss.endpoint", oss_endpoint) + mount.add_encrypt_options("fs.oss.accessKeyId", "access-key", "fs.oss.accessKeyId") + mount.add_encrypt_options("fs.oss.accessKeySecret", "access-key", "fs.oss.accessKeySecret") + + dataset = Dataset("alluxio-demo-dataset") + dataset.add_mount(mount.dump()) + + return dataset + + elif testcase == "jindo-oss": + mount = Mount() + mount.set_mount_info("demo", oss_bucket, "/") + mount.add_options("fs.oss.endpoint", oss_endpoint) + mount.add_encrypt_options("fs.oss.accessKeyId", "access-key", "fs.oss.accessKeyId") + mount.add_encrypt_options("fs.oss.accessKeySecret", "access-key", "fs.oss.accessKeySecret") + + dataset = Dataset("demo-dataset") + dataset.add_mount(mount.dump()) + + return dataset + + elif testcase == "juicefs-minio": + mount = Mount() + mount.set_mount_info("juicefs-community", "juicefs:///") + mount.add_options("bucket", "https://%s:9000/minio/test" % minio_svc) + mount.add_options("storage", "minio") + mount.add_encrypt_options("metaurl", "jfs-secret", "metaurl") + mount.add_encrypt_options("access-key", "jfs-secret", "accesskey") + mount.add_encrypt_options("secret-key", "jfs-secret", "secretkey") + + dataset = Dataset("jfsdemo") + dataset.add_mount(mount.dump()) + dataset.set_access_mode(mode="ReadWriteMany") + + return dataset + + +def assemble_runtime(testcase): + if testcase == "alluxio-webufs": + return __assemble_runtime_by_kind("alluxio", "hbase") + elif testcase == "alluxio-oss": + return __assemble_runtime_by_kind("alluxio", "alluxio-demo-dataset") + elif testcase == "jindo-oss": + return __assemble_runtime_by_kind("jindo", "demo-dataset") + elif testcase == "juicefs-minio": + return __assemble_runtime_by_kind("juicefs", "jfsdemo") + + +def __assemble_runtime_by_kind(runtime_kind, name): + if runtime_kind == "alluxio": + runtime = Runtime("AlluxioRuntime", name) + runtime.set_replicas(1) + runtime.set_tieredstore("MEM", "/dev/shm", "4Gi") + + return runtime + elif runtime_kind == "jindo": + runtime = Runtime("JindoRuntime", name) + runtime.set_replicas(1) + runtime.set_tieredstore("MEM", "/dev/shm", "15Gi") + + return runtime + + elif runtime_kind == "juicefs": + runtime = Runtime("JuiceFSRuntime", name) + runtime.set_replicas(1) + runtime.set_tieredstore("MEM", "/dev/shm/cache1:/dev/shm/cache2", "4Gi", high="", low="0.1") + + return runtime diff --git a/test/prow/fluid/step_funcs.py b/test/prow/fluid/step_funcs.py new file mode 100644 index 00000000000..40a4f0eae42 --- /dev/null +++ b/test/prow/fluid/step_funcs.py @@ -0,0 +1,330 @@ +import os +import sys +import time + +project_root = os.path.dirname(os.path.dirname(__file__)) +sys.path.insert(0, project_root) + +from kubernetes import client, config +from framework.step import check +from framework.exception import TestError + + +SERVERLESS_KEY="serverless.fluid.io/inject" +SERVERFUL_KEY="fuse.serverful.fluid.io/inject" + +def create_dataset_fn(dataset): + def create_dataset(): + + dataset_namespace = dataset["metadata"]["namespace"] + api = client.CustomObjectsApi() + api.create_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + namespace=dataset_namespace, + plural="datasets", + body=dataset, + ) + print("Dataset \"{}/{}\" created".format(dataset["metadata"]["namespace"], dataset["metadata"]["name"])) + + return create_dataset + +def check_dataset_bound_fn(name, namespace="default"): + def check(): + api = client.CustomObjectsApi() + + resource = api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=name, + namespace=namespace, + plural="datasets" + ) + + if "status" in resource: + if "phase" in resource["status"]: + if resource["status"]["phase"] == "Bound": + return True + + return False + + return check + +def create_runtime_fn(runtime): + def create_runtime(): + plural_str = "{}s".format(runtime["kind"].lower()) + runtime_namespace = runtime["metadata"]["namespace"] + + api = client.CustomObjectsApi() + api.create_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + namespace=runtime_namespace, + plural=plural_str, + body=runtime + ) + print("{} \"{}/{}\" created".format(runtime["kind"], runtime["metadata"]["namespace"], + runtime["metadata"]["name"])) + + return create_runtime + +def delete_dataset_and_runtime_fn(runtime, name, namespace="default"): + def check_clean_up(): + api = client.CustomObjectsApi() + plural_str = "{}s".format(runtime["kind"].lower()) + + print("runtime still exists...") + try: + to_delete = api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=name, + namespace=namespace, + plural=plural_str + ) + except client.exceptions.ApiException as e: + if e.status == 404: + return True + + return False + + def delete_dataset(): + api = client.CustomObjectsApi() + + try: + api.delete_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=name, + namespace=namespace, + plural="datasets" + ) + except client.exceptions.ApiException as e: + if e.status == 404: + return + else: + raise e + except Exception as e: + raise e + + + print("Dataset \"{}/{}\" deleted".format(namespace, name)) + + timeout_check_fn = check(check_clean_up, 60, 3) + timeout_check_fn() + + return delete_dataset + +def create_dataload_fn(dataload): + def create(): + api = client.CustomObjectsApi() + + name = dataload["metadata"]["name"] + namespace = dataload["metadata"]["namespace"] + + api.create_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + namespace=namespace, + plural="dataloads", + body=dataload, + ) + + print("DataLoad \"{}/{}\" created.".format(namespace, name)) + + return create + +def check_dataload_job_status_fn(dataload_name, dataload_namespace="default"): + def check(): + api = client.CustomObjectsApi() + + resource = api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=dataload_name, + namespace=dataload_namespace, + plural="dataloads" + ) + + if "status" in resource: + if "phase" in resource["status"]: + if resource["status"]["phase"] == "Complete": + return True + + return False + + return check + +def check_dataset_cached_percentage_fn(name, namespace="default"): + def check(): + api = client.CustomObjectsApi() + + resource = api.get_namespaced_custom_object_status( + group="data.fluid.io", + version="v1alpha1", + name=name, + namespace=namespace, + plural="datasets" + ) + + if "status" in resource: + if "ufsTotal" in resource["status"]: + if "cacheStates" in resource["status"] and "cached" in resource["status"]["cacheStates"]: + if resource["status"]["ufsTotal"] == resource["status"]["cacheStates"]["cached"]: + print("Checking Dataset warmed up status. Expected: %s, current status: %s" % ( + resource["status"]["ufsTotal"], resource["status"]["cacheStates"]["cached"])) + return True + + return False + + return check + +def check_volume_resource_ready_fn(name, namespace="default"): + def check(): + api = client.CoreV1Api() + + pv_name = "{}-{}".format(namespace, name) + pvc_name = name + + try: + api.read_persistent_volume(name=pv_name) + api.read_namespaced_persistent_volume_claim(name=pvc_name, namespace=namespace) + except client.exceptions.ApiException as e: + if e.status == 404: + return False + except Exception as e: + return False + + print("PersistentVolume {} & PersistentVolumeClaim {}/{} ready.".format(pv_name, namespace, pvc_name)) + return True + + return check + +def create_job_fn(script, dataset_name, name="fluid-e2e-job-test", namespace="default", serverless=False): + def create(): + api = client.BatchV1Api() + + container = client.V1Container( + name="alpine", + image="alpine", + command=["/bin/sh"], + args=["-c", "set -ex; {}".format(script)], + volume_mounts=[client.V1VolumeMount(mount_path="/data", name="data-vol")] + ) + + if serverless: + obj_meta=client.V1ObjectMeta(labels={"app": "dataread", SERVERLESS_KEY: "true"}) + else: + obj_meta=client.V1ObjectMeta(labels={"app": "dataread"}) + + template = client.V1PodTemplateSpec( + metadata=obj_meta, + spec=client.V1PodSpec(restart_policy="Never", containers=[container], + volumes=[client.V1Volume(name="data-vol", + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( + claim_name=dataset_name))]) + ) + + spec = client.V1JobSpec( + template=template, + backoff_limit=4 + ) + + job = client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=client.V1ObjectMeta(name=name), + spec=spec + ) + + api.create_namespaced_job(namespace=namespace, body=job) + print("Job \"{}/{}\" created.[script=\"{}\"]".format(namespace, name, script)) + + return create + +def check_job_status_fn(name="fluid-e2e-job-test", namespace="default"): + def check(): + api = client.BatchV1Api() + + try: + response = api.read_namespaced_job_status( + name=name, + namespace=namespace + ) + + if response.status.succeeded is not None: + return True + except Exception as e: + print(e) + + return False + + return check + +def delete_job_fn(name="fluid-e2e-job-test", namespace="default"): + def delete(): + batch_api = client.BatchV1Api() + + body = client.V1DeleteOptions(propagation_policy='Background') + batch_api.delete_namespaced_job(name=name, namespace=namespace, body=body) + + return delete + +def create_pod_fn(dataset_name, name="nginx-test", namespace="default", serverless=False, serverful=False): + def create(): + api = client.CoreV1Api() + container = client.V1Container( + name="nginx", + image="nginx", + volume_mounts=[client.V1VolumeMount(mount_path="/data", name="data-vol")] + ) + + volume = client.V1Volume( + name="data-vol", + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name=dataset_name) + ) + + labels = {} + if serverless: + labels[SERVERLESS_KEY] = "true" + if serverful: + labels[SERVERFUL_KEY] = "true" + + pod = client.V1Pod( + api_version="v1", + kind="Pod", + metadata=client.V1ObjectMeta(name=name, labels=labels), + spec=client.V1PodSpec( + containers=[container], + volumes=[volume] + ) + ) + + api.create_namespaced_pod(namespace=namespace, body=pod) + print("Pod {} created".format(name)) + + return create + +def check_pod_running_fn(name="nginx-test", namespace="default"): + def check(): + api = client.CoreV1Api() + pod_status = api.read_namespaced_pod(name, namespace).status + if pod_status.phase == "Running": + return True + + return False + + return check + +def delete_pod_fn(name="nginx-test", namespace="default"): + def delete(): + api = client.CoreV1Api() + body = client.V1DeleteOptions(propagation_policy='Background') + + try: + api.delete_namespaced_pod(name=name, namespace=namespace, body=body) + except client.exceptions.ApiException as e: + if e.status != 404: + raise TestError("failed to delete pod with code status {}".format(e.status)) + + return delete diff --git a/test/prow/framework/exception.py b/test/prow/framework/exception.py new file mode 100644 index 00000000000..13acade6511 --- /dev/null +++ b/test/prow/framework/exception.py @@ -0,0 +1,11 @@ +import os +import sys + +project_root = os.path.dirname(os.path.dirname(__file__)) +sys.path.insert(0, project_root) + + +class TestError(Exception): + def __init__(self, message): + self.msg = message + super(TestError, self).__init__(message) \ No newline at end of file diff --git a/test/prow/framework/step.py b/test/prow/framework/step.py new file mode 100644 index 00000000000..24cbb263188 --- /dev/null +++ b/test/prow/framework/step.py @@ -0,0 +1,63 @@ +import os +import sys +import time + +project_root = os.path.dirname(os.path.dirname(__file__)) +sys.path.insert(0, project_root) + +from framework.exception import TestError + +def currying_fn(fn, **kwargs): + def curried_fn(): + return fn(**kwargs) + + return curried_fn + +def check(fn, retries, interval): + def check_internal(): + tt = 0 + while tt < retries: + if fn(): + return + time.sleep(interval) + tt += 1 + + raise TestError("timeout for {} seconds".format(retries * interval)) + + return check_internal + +def sleep(sleep_seconds): + def sleep_internal(): + time.sleep(sleep_seconds) + + return sleep_internal + + +def dummy_back(): + pass + + +class SimpleStep(): + def __init__(self, step_name, forth_fn, back_fn): + self.step_name = step_name + self.forth_fn = forth_fn + self.back_fn = back_fn + + def get_step_name(self): + return self.step_name + + def go_forth(self): + self.forth_fn() + + def go_back(self): + self.back_fn() + + +class StatusCheckStep(SimpleStep): + def __init__(self, step_name, forth_fn, back_fn=dummy_back, timeout=120, interval=1): + super().__init__(step_name, check(forth_fn, timeout, interval), back_fn) + + +class SleepStep(SimpleStep): + def __init__(self, sleep_seconds, back_fn=dummy_back): + super().__init__("sleep for {} seconds".format(sleep_seconds), sleep(sleep_seconds), back_fn) \ No newline at end of file diff --git a/test/prow/framework/testflow.py b/test/prow/framework/testflow.py new file mode 100644 index 00000000000..b5482ba3fd6 --- /dev/null +++ b/test/prow/framework/testflow.py @@ -0,0 +1,47 @@ +import os +import sys +import time + +project_root = os.path.dirname(os.path.dirname(__file__)) +sys.path.insert(0, project_root) + +from framework.exception import TestError + + +class TestFlow(): + def __init__(self, case): + self.case = case + self.steps = [] + + def append_step(self, step): + self.steps.append(step) + + def run(self): + print("> Testcase \"{}\" started <".format(self.case)) + undergoing_step = 0 + total_steps = len(self.steps) + failed = False + try: + while undergoing_step < total_steps: + self.steps[undergoing_step].go_forth() + print("PASS {}".format(self.steps[undergoing_step].get_step_name())) + time.sleep(3) + undergoing_step += 1 + except TestError as e: + failed = True + print("FAIL {}".format(self.steps[undergoing_step].get_step_name())) + msg = e.msg + raise Exception("> Testcase \"{}\" failed at Step \"{}\": {}".format(self.case, self.steps[undergoing_step].get_step_name(), msg)) + except Exception as e: + failed = True + print("FAIL {}".format(self.steps[undergoing_step].get_step_name())) + raise e + finally: + if undergoing_step >= total_steps: + undergoing_step = total_steps - 1 + while undergoing_step >= 0: + self.steps[undergoing_step].go_back() + undergoing_step -= 1 + if not failed: + print("> Testcase \"{}\" succeeded <\n\n".format(self.case)) + diff --git a/test/prow/fuse_recovery.py b/test/prow/fuse_recovery.py deleted file mode 100644 index 407b89d396e..00000000000 --- a/test/prow/fuse_recovery.py +++ /dev/null @@ -1,306 +0,0 @@ -""" -TestCase: Recover Fuse -DDC Engine: Alluxio -Steps: -1. check if Fuse Recover is Enabled -2. create Dataset(WebUFS) & Runtime -3. check if dataset is bound -4. check if persistentVolumeClaim & PV is created -5. create data list Pod with Injection label -6. wait until Pod running & check if data list succeed -7. delete Alluxio Fuse Pod -8. check Fuse recovered -9. check if data list succeed -10. clean up -""" -import os -import time - -from kubernetes import client, config - -namespace = "default" - - -def getPodNameByPrefix(prefix, pod_namespace): - api = client.CoreV1Api() - pods = api.list_namespaced_pod(pod_namespace) - pods_name = [item.metadata.name for item in pods.items] - for name in pods_name: - if name.__contains__(prefix): - pod_name = name - return pod_name - return None - - -def checkCsiRecoverEnabled() -> bool: - """ - check if csi-nodeplugin-fluid-xxxx pod.spec.containers has args "FuseRecovery=true" - """ - fluid_namespace = "fluid-system" - pod_name = "csi-nodeplugin-fluid" - pod_name = getPodNameByPrefix(pod_name, fluid_namespace) - if pod_name is None: - return False - api = client.CoreV1Api() - for i in range(10): - pod = api.read_namespaced_pod(pod_name, fluid_namespace) - if str(pod.spec.containers).__contains__("FuseRecovery=true"): - print("CSI recovery enabled") - return True - time.sleep(5) - return False - - -def createDatasetAndRuntime(): - api = client.CustomObjectsApi() - my_dataset = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "Dataset", - "metadata": {"name": "hbase"}, - "spec": { - "mounts": [{"mountPoint": "https://mirrors.bit.edu.cn/apache/hbase/stable/", "name": "hbase"}] - } - } - - my_alluxioruntime = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "AlluxioRuntime", - "metadata": {"name": "hbase"}, - "spec": { - "replicas": 1, - "tieredstore": { - "levels": [{ - "mediumtype": "MEM", - "path": "/dev/shm", - "quota": "2Gi", - "high": "0.95", - "low": "0.7" - }] - } - } - } - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace=namespace, - plural="datasets", - body=my_dataset, - ) - print("Created dataset.") - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace=namespace, - plural="alluxioruntimes", - body=my_alluxioruntime - ) - print("Created alluxioruntime.") - - -def checkDatasetBound(): - api = client.CustomObjectsApi() - while True: - resource = api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace=namespace, - plural="datasets" - ) - print(resource) - if "status" in resource: - if "phase" in resource["status"]: - if resource["status"]["phase"] == "Bound": - time.sleep(5) - return True - time.sleep(1) - - -def checkFuseRecovered(): - ### get dataset hbase uid - api = client.CustomObjectsApi() - dataset = api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace=namespace, - plural="datasets", - name="hbase") - uid = dataset['metadata']['uid'] - print("Dataset hbase uid is: {}".format(uid)) - while True: - uids = getFuseRecoveredUids() - print("Total uids are: {}".format(uids)) - if uids.__contains__(uid): - print("Fuse Recovered.") - return True - print("Fuse not Recovered.") - time.sleep(3) - - -def checkVolumeResourcesReady(): - while True: - try: - client.CoreV1Api().read_persistent_volume(name=namespace + "-hbase") - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - try: - client.CoreV1Api().read_namespaced_persistent_volume_claim(name="hbase", namespace=namespace) - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - print("PersistentVolume & PersistentVolumeClaim Ready.") - break - time.sleep(5) - - -def createDataListPod(name): - api = client.CoreV1Api() - containers = [client.V1Container( - name="nginx", - image="nginx", - # mount_propagation="HostToContainer" - volume_mounts=[client.V1VolumeMount(mount_path="/data", name="hbase-vol")] - )] - volumes = [client.V1Volume( - name="hbase-vol", - persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name="hbase") - )] - spec = client.V1PodSpec( - containers=containers, - volumes=volumes - ) - pod = client.V1Pod( - api_version="v1", - kind="Pod", - metadata=client.V1ObjectMeta(name=name, labels={"fuse.serverful.fluid.io/inject": "true"}), - spec=spec - ) - api.create_namespaced_pod(namespace=namespace, body=pod) - print("Pod created.") - time.sleep(5) - - -def checkDataListSuccess(name) -> bool: - cmd = "kubectl -n {} exec -it {} ls /data/hbase".format(namespace, name) - success = os.system(cmd) - if success == 0: - print("Data Read done.") - return True - else: - print("Data Read Fail") - return False - - -def deletePod(prefix, pod_namespace): - pod_name = getPodNameByPrefix(prefix, pod_namespace) - api = client.CoreV1Api() - api.delete_namespaced_pod(pod_name, pod_namespace) - time.sleep(5) - print("Delete pod: {}".format(pod_name)) - - -def cleanUp(pod_name): - api = client.CoreV1Api() - # Delete Data Read Pod - body = client.V1DeleteOptions(propagation_policy='Background') - if getPodNameByPrefix(pod_name, namespace) is not None: - api.delete_namespaced_pod(name=pod_name, namespace=namespace, body=body) - print("Delete pod:{}".format(pod_name)) - time.sleep(5) - - # Delete Dataset & Alluxioruntime - custom_api = client.CustomObjectsApi() - custom_api.delete_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace=namespace, - plural="datasets" - ) - time.sleep(5) - runtimeDelete = False - while not runtimeDelete: - print("runtime still exists...") - try: - runtime = custom_api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name="hbase", - namespace=namespace, - plural="alluxioruntimes" - ) - except client.exceptions.ApiException as e: - if e.status == 404: - runtimeDelete = True - continue - - time.sleep(1) - - -def checkPodReady(name, pod_namespace) -> bool: - api = client.CoreV1Api() - while True: - name = getPodNameByPrefix(name, pod_namespace) - if name is None: - return False - pod = api.read_namespaced_pod(name, pod_namespace).status - if pod.phase == "Running": - print("Pod-{} is {}.".format(name, pod.phase)) - return True - else: - print("Pod-{} is {}.".format(name, pod.phase)) - time.sleep(1) - - -def deleteAlluxioFusePod(): - deletePod("hbase-fuse", namespace) - print("Delete Fuse Pod:{}".format("hbase-fuse-xxxx")) - time.sleep(60) - - -def getFuseRecoveredUids(): - api = client.CoreV1Api() - items = api.list_namespaced_event(namespace=namespace).items - fuseRecoveryUids = set() - for item in items: - if item.message.__contains__("Fuse recover"): - fuseRecoveryUids.add(item.involved_object.uid) - return fuseRecoveryUids - - -def main(): - exit_code = 0 - ### Load config - config.load_incluster_config() - if checkCsiRecoverEnabled() is False: - return 1 - - ### Create dataset & alluxioruntime - createDatasetAndRuntime() - checkDatasetBound() - checkVolumeResourcesReady() - - ### Create Pod with Injection Label - createDataListPod("nginx") - if checkPodReady("nginx", namespace): - time.sleep(5) - checkDataListSuccess("nginx") - - ### Delete fuse - deleteAlluxioFusePod() - if checkPodReady("hbase-fuse", namespace) and checkFuseRecovered(): - time.sleep(5) - if checkDataListSuccess("nginx"): - exit_code = 0 - else: - exit_code = 1 - cleanUp("nginx") - return exit_code - - -if __name__ == '__main__': - exit(main()) diff --git a/test/prow/juicefs_access_data.py b/test/prow/juicefs_access_data.py deleted file mode 100644 index 14306f55e76..00000000000 --- a/test/prow/juicefs_access_data.py +++ /dev/null @@ -1,577 +0,0 @@ -# Copyright 2022 The Fluid Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -TestCase: Pod accesses Juicefs data -DDC Engine: Juicefs(Community) with local redis and minio - -Prerequisite: -1. docker run -d -p 9000:9000 \ - --name minio \ - -e "MINIO_ROOT_USER=minioadmin" \ - -e "MINIO_ROOT_PASSWORD=minioadmin" \ - minio/minio server /data -2. docker run -itd --name redis -p 6379:6379 redis -3. Write down the node IP -4. Apply the following secret -``` -apiVersion: v1 -kind: Secret -metadata: - name: jfs-secret -stringData: - metaurl: redis://:6379/0 - access-key: minioadmin - secret-key: minioadmin -``` - -Steps: -1. create Dataset & Runtime -2. check if dataset is bound -3. check if PVC & PV is created -4. submit data write job -5. wait until data write job completes -6. submit data read job -7. check if data content consistent -8. clean up -""" - -import time - -from kubernetes import client, config - -NODE_IP = "minio" -APP_NAMESPACE = "default" -SECRET_NAME = "jfs-secret" - - -def create_redis_secret(): - api = client.CoreV1Api() - jfs_secret = { - "apiVersion": "v1", - "kind": "Secret", - "metadata": {"name": SECRET_NAME}, - "stringData": {"metaurl": "redis://redis:6379/0", "accesskey": "minioadmin", "secretkey": "minioadmin"} - } - - api.create_namespaced_secret(namespace=APP_NAMESPACE, body=jfs_secret) - print("Created secret {}".format(SECRET_NAME)) - - -def create_dataset_and_runtime(dataset_name): - api = client.CustomObjectsApi() - my_dataset = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "Dataset", - "metadata": {"name": dataset_name, "namespace": APP_NAMESPACE}, - "spec": { - "mounts": [{ - "mountPoint": "juicefs:///", - "name": "juicefs-community", - "options": {"bucket": "http://%s:9000/minio/test" % NODE_IP, "storage": "minio"}, - "encryptOptions": [ - {"name": "metaurl", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "metaurl"}}}, - {"name": "access-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "accesskey"}}}, - {"name": "secret-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "secretkey"}}} - ] - }], - "accessModes": ["ReadWriteMany"] - } - } - - my_juicefsruntime = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "JuiceFSRuntime", - "metadata": {"name": dataset_name, "namespace": APP_NAMESPACE}, - "spec": { - "replicas": 1, - "tieredstore": {"levels": [ - {"mediumtype": "MEM", "path": "/dev/shm/cache1:/dev/shm/cache2", "quota": "400Mi", "low": "0.1"}]} - } - } - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="datasets", - body=my_dataset, - ) - print("Create dataset {}".format(dataset_name)) - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="juicefsruntimes", - body=my_juicefsruntime - ) - print("Create juicefs runtime {}".format(dataset_name)) - - -def create_datamigrate(datamigrate_name, dataset_name): - api = client.CustomObjectsApi() - my_datamigrate = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "DataMigrate", - "metadata": {"name": datamigrate_name, "namespace": APP_NAMESPACE}, - "spec": { - "image": "registry.cn-hangzhou.aliyuncs.com/juicefs/juicefs-fuse", - "imageTag": "nightly", - "from": { - "dataset": {"name": dataset_name, "namespace": APP_NAMESPACE} - }, - "to": {"externalStorage": { - "uri": "minio://%s:9000/minio/test/" % NODE_IP, - "encryptOptions": [ - {"name": "access-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "accesskey"}}}, - {"name": "secret-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "secretkey"}}}, - ] - }} - }, - } - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="datamigrates", - body=my_datamigrate, - ) - print("Create datamigrate {}".format(datamigrate_name)) - - -def check_datamigrate_complete(datamigrate_name): - api = client.CustomObjectsApi() - - count = 0 - while count < 300: - resource = api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name=datamigrate_name, - namespace=APP_NAMESPACE, - plural="datamigrates" - ) - - if "status" in resource: - if "phase" in resource["status"]: - if resource["status"]["phase"] == "Complete": - print("Datamigrate {} is complete.".format(datamigrate_name)) - return True - time.sleep(1) - count += 1 - print("Datamigrate {} is not complete within 300s.".format(datamigrate_name)) - return False - - -def get_worker_node(dataset_name): - api = client.CoreV1Api() - pod_name = "{}-worker-0".format(dataset_name) - count = 0 - while count < 300: - count += 1 - try: - pod = api.read_namespaced_pod(name=pod_name, namespace=APP_NAMESPACE) - return pod.spec.node_name - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - return "" - - -def create_check_cache_job(node_name): - print("Create check cache job") - api = client.BatchV1Api() - - container = client.V1Container( - name="demo", - image="debian:buster", - command=["/bin/bash"], - args=["-c", "if [ $(find /dev/shm/* | grep chunks | wc -l) = 0 ]; then exit 0; else exit 1; fi"], - volume_mounts=[client.V1VolumeMount(mount_path="/dev/shm/cache1", name="cache1"), - client.V1VolumeMount(mount_path="/dev/shm/cache2", name="cache2")] - ) - - template = client.V1PodTemplateSpec( - metadata=client.V1ObjectMeta(labels={"app": "checkcache"}), - spec=client.V1PodSpec( - restart_policy="Never", - containers=[container], - volumes=[ - client.V1Volume( - name="cache1", - host_path=client.V1HostPathVolumeSource(path="/dev/shm/cache1") - ), - client.V1Volume( - name="cache2", - host_path=client.V1HostPathVolumeSource(path="/dev/shm/cache2") - ) - ], - node_name=node_name, - ) - ) - - spec = client.V1JobSpec(template=template, backoff_limit=4) - - job = client.V1Job( - api_version="batch/v1", - kind="Job", - metadata=client.V1ObjectMeta(name="checkcache", namespace=APP_NAMESPACE), - spec=spec - ) - - api.create_namespaced_job(namespace=APP_NAMESPACE, body=job) - print("Job {} created.".format("checkcache")) - - -def check_dataset_bound(dataset_name): - api = client.CustomObjectsApi() - - count = 0 - while count < 300: - resource = api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name=dataset_name, - namespace=APP_NAMESPACE, - plural="datasets" - ) - - if "status" in resource: - if "phase" in resource["status"]: - if resource["status"]["phase"] == "Bound": - print("Dataset {} is bound.".format(dataset_name)) - return True - time.sleep(1) - count += 1 - print("Dataset {} is not bound within 300s.".format(dataset_name)) - return False - - -def check_volume_resources_ready(dataset_name): - pv_name = "{}-{}".format(APP_NAMESPACE, dataset_name) - pvc_name = dataset_name - count = 0 - while count < 300: - count += 1 - try: - client.CoreV1Api().read_persistent_volume(name=pv_name) - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - - try: - client.CoreV1Api().read_namespaced_persistent_volume_claim(name=pvc_name, namespace=APP_NAMESPACE) - except client.exceptions.ApiException as e: - if e.status == 404: - time.sleep(1) - continue - - print("PersistentVolume {} & PersistentVolumeClaim {} Ready.".format(pv_name, pvc_name)) - return True - print("PersistentVolume {} & PersistentVolumeClaim {} not ready within 300s.".format(pv_name, pvc_name)) - return False - - -def create_data_write_job(dataset_name, job_name, use_sidecar=False): - pvc_name = dataset_name - api = client.BatchV1Api() - - container = client.V1Container( - name="demo", - image="debian:buster", - command=["/bin/bash"], - args=["-c", "dd if=/dev/zero of=/data/allzero.file bs=100M count=10 && sha256sum /data/allzero.file"], - volume_mounts=[client.V1VolumeMount(mount_path="/data", name="demo")] - ) - - template = client.V1PodTemplateSpec( - metadata=client.V1ObjectMeta(labels={"app": "datawrite"}), - spec=client.V1PodSpec( - restart_policy="Never", - containers=[container], - volumes=[client.V1Volume( - name="demo", - persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name) - )] - ) - ) - if use_sidecar: - template.metadata.labels["serverless.fluid.io/inject"] = "true" - - spec = client.V1JobSpec(template=template, backoff_limit=4) - - job = client.V1Job( - api_version="batch/v1", - kind="Job", - metadata=client.V1ObjectMeta(name=job_name, namespace=APP_NAMESPACE), - spec=spec - ) - - api.create_namespaced_job(namespace=APP_NAMESPACE, body=job) - print("Job {} created.".format(job_name)) - - -def create_data_read_job(dataset_name, job_name, use_sidecar=False): - pvc_name = dataset_name - api = client.BatchV1Api() - - container = client.V1Container( - name="demo", - image="debian:buster", - command=["/bin/bash"], - args=["-c", "time sha256sum /data/allzero.file && rm /data/allzero.file"], - volume_mounts=[client.V1VolumeMount(mount_path="/data", name="demo")] - ) - - template = client.V1PodTemplateSpec( - metadata=client.V1ObjectMeta(labels={"app": "dataread"}), - spec=client.V1PodSpec( - restart_policy="Never", - containers=[container], - volumes=[client.V1Volume( - name="demo", - persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name) - )] - ) - ) - if use_sidecar: - template.metadata.labels["serverless.fluid.io/inject"] = "true" - - spec = client.V1JobSpec(template=template, backoff_limit=4) - - job = client.V1Job( - api_version="batch/v1", - kind="Job", - metadata=client.V1ObjectMeta(name=job_name, namespace=APP_NAMESPACE), - spec=spec - ) - - api.create_namespaced_job(namespace=APP_NAMESPACE, body=job) - print("Data Read Job {} created.".format(job_name)) - - -def check_data_job_status(job_name): - api = client.BatchV1Api() - - count = 0 - while count < 300: - count += 1 - response = api.read_namespaced_job_status(name=job_name, namespace=APP_NAMESPACE) - if response.status.succeeded is not None: - print("Job {} completed.".format(job_name)) - return True - if response.status.failed is not None: - print("Job {} failed.".format(job_name)) - return False - time.sleep(1) - print("Job {} not completed within 300s.".format(job_name)) - return False - - -def clean_job(job_name): - batch_api = client.BatchV1Api() - - # See https://github.com/kubernetes-client/python/issues/234 - body = client.V1DeleteOptions(propagation_policy='Background') - try: - batch_api.delete_namespaced_job(name=job_name, namespace=APP_NAMESPACE, body=body) - except client.exceptions.ApiException as e: - if e.status == 404: - print("job {} deleted".format(job_name)) - return True - - count = 0 - while count < 300: - count += 1 - print("job {} still exists...".format(job_name)) - try: - batch_api.read_namespaced_job(name=job_name, namespace=APP_NAMESPACE) - except client.exceptions.ApiException as e: - if e.status == 404: - print("job {} deleted".format(job_name)) - return True - time.sleep(1) - - print("job {} not deleted within 300s".format(job_name)) - return False - - -def clean_up_dataset_and_runtime(dataset_name): - custom_api = client.CustomObjectsApi() - custom_api.delete_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name=dataset_name, - namespace=APP_NAMESPACE, - plural="datasets" - ) - print("Dataset {} deleted".format(dataset_name)) - - count = 0 - while count < 300: - count += 1 - print("JuiceFSRuntime {} still exists...".format(dataset_name)) - try: - custom_api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name=dataset_name, - namespace=APP_NAMESPACE, - plural="juicefsruntimes" - ) - except client.exceptions.ApiException as e: - if e.status == 404: - print("JuiceFSRuntime {} is cleaned up".format(dataset_name)) - return True - time.sleep(1) - print("JuiceFSRuntime {} is not cleaned up within 300s".format(dataset_name)) - return False - - -def clean_up_datamigrate(datamigrate_name): - custom_api = client.CustomObjectsApi() - custom_api.delete_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name=datamigrate_name, - namespace=APP_NAMESPACE, - plural="datamigrates" - ) - print("Datamigrate {} deleted".format(datamigrate_name)) - - -def clean_up_secret(): - core_api = client.CoreV1Api() - core_api.delete_namespaced_secret(name=SECRET_NAME, namespace=APP_NAMESPACE) - print("secret {} is cleaned up".format(SECRET_NAME)) - - -def main(): - config.load_incluster_config() - - # **************************************************************** - # ------- test normal mode ------- - # **************************************************************** - dataset_name = "jfsdemo" - datamigrate_name = "jfsdemo" - test_write_job = "demo-write" - test_read_job = "demo-read" - try: - # 1. create secret - create_redis_secret() - - # 2. create dataset and runtime - create_dataset_and_runtime(dataset_name) - if not check_dataset_bound(dataset_name): - raise Exception("dataset {} in normal mode is not bound.".format(dataset_name)) - if not check_volume_resources_ready(dataset_name): - raise Exception("volume resources of dataset {} in normal mode are not ready.".format(dataset_name)) - - node_name = get_worker_node(dataset_name) - - # 3. create write & read data job - create_data_write_job(dataset_name, test_write_job) - if not check_data_job_status(test_write_job): - raise Exception("write job {} in normal mode failed.".format(test_write_job)) - create_data_read_job(dataset_name, test_read_job) - if not check_data_job_status(test_read_job): - raise Exception("read job {} in normal mode failed.".format(test_read_job)) - - # **************************************************************** - # ------- test data migrate ------- - # **************************************************************** - # 1. create datamigrate - create_datamigrate(datamigrate_name, dataset_name) - - # 2. check datamigrate status - if not check_datamigrate_complete(datamigrate_name): - raise Exception("datamigrate {} failed.".format(datamigrate_name)) - - except Exception as e: - print(e) - exit(-1) - finally: - # clear - # 1. clean up write & read data job - clean_job(test_write_job) - clean_job(test_read_job) - - # 2. clean up datamigrate - clean_up_datamigrate(datamigrate_name) - - # 3. clean up dataset and runtime - clean_up_dataset_and_runtime(dataset_name) - - # 4. clean up secret - clean_up_secret() - - # **************************************************************** - # ------- test cache clear after runtime shutdown ------- - # **************************************************************** - try: - create_check_cache_job(node_name) - if not check_data_job_status("checkcache"): - raise Exception("read job {} in normal mode failed.".format("checkcache")) - except Exception as e: - print(e) - exit(-1) - finally: - # clean up check cache job - clean_job("checkcache") - - # **************************************************************** - # ------- test sidecar mode ------- - # **************************************************************** - dataset_name = "jfsdemo-sidecar" - test_write_job = "demo-write-sidecar" - test_read_job = "demo-read-sidecar" - try: - # 1. create secret - create_redis_secret() - - # 2. create dataset and runtime - create_dataset_and_runtime(dataset_name) - if not check_dataset_bound(dataset_name): - raise Exception("dataset {} in sidecar mode is not bound.".format(dataset_name)) - if not check_volume_resources_ready(dataset_name): - raise Exception("volume resources of dataset {} in sidecar mode are not ready.".format(dataset_name)) - - # 3. create write & read data job - create_data_write_job(dataset_name, test_write_job, use_sidecar=True) - if not check_data_job_status(test_write_job): - raise Exception("write job {} in sidecar mode failed.".format(test_write_job)) - create_data_read_job(dataset_name, test_read_job, use_sidecar=True) - if not check_data_job_status(test_read_job): - raise Exception("read job {} in sidecar mode failed.".format(test_read_job)) - except Exception as e: - print(e) - exit(-1) - finally: - # 4. clean up write & read data job - clean_job(test_write_job) - clean_job(test_read_job) - - # 5. clean up dataset and runtime - clean_up_dataset_and_runtime(dataset_name) - - # 6. clean up secret - clean_up_secret() - - -if __name__ == '__main__': - main() diff --git a/test/prow/patch_node_label_parallel.py b/test/prow/patch_node_label_parallel.py deleted file mode 100644 index 8ec2671f971..00000000000 --- a/test/prow/patch_node_label_parallel.py +++ /dev/null @@ -1,230 +0,0 @@ -""" -TestCase: Patch Node Label Parallel -DDC Engine: Alluxio -Steps: -1. patch label -2. create dataset and runtime demo1 and demo2 -3. check label -4. delete dataset demo1 and create dataset demo3 -5. check label -6. clean up -""" - -from kubernetes import client, config -from kubernetes.client.rest import ApiException - -import time - - - -def getAttributeNode(): - api = client.CoreV1Api() - node_list = api.list_node() - if len(node_list.items) > 0: - return node_list.items[0] - return None - -def getNodes(): - api = client.CoreV1Api() - node_list = api.list_node() - return node_list - - -def patchNodeLabel(key, value, node): - api = client.CoreV1Api() - - body = { - "metadata": { - "labels": { - key: value, - } - } - } - - # Patching the node labels - api_response = api.patch_node(node.metadata.name, body) - print("node label: %s\t%s" % (node.metadata.name, node.metadata.labels)) - -def checkLabel(*datasets, node): - api = client.CoreV1Api() - try: - latestNode = api.read_node(node.metadata.name) - # print(latestNode) - except ApiException as e: - print("Exception when calling CoreV1Api->read_node: %s\n" % e) - return False - labels = latestNode.metadata.labels - - dataNumKey = "fluid.io/dataset-num" - alluxioKeyPrefix = "fluid.io/s-alluxio-default-" - datasetKeyPrefix = "fluid.io/s-default-" - # check dataset number label - # print(labels) - if dataNumKey not in labels or labels[dataNumKey] != str(len(datasets)): - return False - # check alluxio label - for dataset in datasets: - alluxioKey = alluxioKeyPrefix + dataset - datasetKey = datasetKeyPrefix + dataset - if alluxioKey not in labels or labels[alluxioKey] != "true": - return False - if datasetKey not in labels or labels[datasetKey] != "true": - return False - return True - - - -def createDatasetAndRuntime(*runtimes): - api = client.CustomObjectsApi() - for runtime in runtimes: - my_dataset = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "Dataset", - "metadata": {"name": runtime}, - "spec": { - "mounts": [{"mountPoint": "https://mirrors.bit.edu.cn/apache/spark/", "name": "hbase"}], - "nodeAffinity": { - "required": { - "nodeSelectorTerms": [{ - "matchExpressions": [{ - "key": "fluid", - "operator": "In", - "values": ["multi-dataset"] - }] - }] - } - }, - "placement": "Shared" - } - } - print(my_dataset) - - my_alluxioruntime = { - "apiVersion": "data.fluid.io/v1alpha1", - "kind": "AlluxioRuntime", - "metadata": {"name": runtime}, - "spec": { - "replicas": 1, - "podMetadata": { - "labels": { - "foo": "bar" - } - }, - "tieredstore": { - "levels": [{ - "mediumtype": "MEM", - "path": "/dev/shm", - "quota": "2Gi", - "high": "0.95", - "low": "0.7" - }] - } - } - } - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="datasets", - body=my_dataset, - ) - print("Created dataset %s." % (runtime)) - - api.create_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - namespace="default", - plural="alluxioruntimes", - body=my_alluxioruntime - ) - print("Created runtime %s" % (runtime)) - - - -def checkDatasetBound(dataset): - api = client.CustomObjectsApi() - - while True: - resource = api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name=dataset, - namespace="default", - plural="datasets" - ) - - print(resource) - - if "status" in resource: - if "phase" in resource["status"]: - if resource["status"]["phase"] == "Bound": - break - time.sleep(1) - print(resource) - - -def cleanDatasetAndRuntime(*datasets): - custom_api = client.CustomObjectsApi() - for dataset in datasets: - custom_api.delete_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name=dataset, - namespace="default", - plural="datasets" - ) - - runtimeDelete = False - while not runtimeDelete: - print("runtime %s still exists..." % (dataset)) - try: - runtime = custom_api.get_namespaced_custom_object( - group="data.fluid.io", - version="v1alpha1", - name=dataset, - namespace="default", - plural="alluxioruntimes" - ) - except client.exceptions.ApiException as e: - if e.status == 404: - runtimeDelete = True - continue - - time.sleep(1) - - -def main(): - config.load_incluster_config() - # 1. patch label - nodes = getNodes() - if len(nodes.items) == 0: - return 1 - node = nodes.items[0] - patchNodeLabel("fluid", "multi-dataset", node) - # 2. create dataset and runtime demo1 and demo2 - createDatasetAndRuntime("demo1", "demo2") - checkDatasetBound("demo1") - checkDatasetBound("demo2") - time.sleep(20) - # 3. check label - if not checkLabel("demo1", "demo2", node=node): - print("[checkabel] label not found") - return 1 - # 4. delete dataset demo1 and create dataset demo3 - cleanDatasetAndRuntime("demo1") - createDatasetAndRuntime("demo3") - checkDatasetBound("demo3") - time.sleep(20) - # 5. check label - if not checkLabel("demo2", "demo3", node=node): - print("[checkabel] label not found") - return 1 - # 6. clean all - cleanDatasetAndRuntime("demo2", "demo3") - patchNodeLabel("fluid", None, node) - return 0 - - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/test/prow/testcases/alluxio/alluxio_dynamic_mountpoint.py b/test/prow/testcases/alluxio/alluxio_dynamic_mountpoint.py new file mode 100644 index 00000000000..dfbf3dc61b1 --- /dev/null +++ b/test/prow/testcases/alluxio/alluxio_dynamic_mountpoint.py @@ -0,0 +1,230 @@ +""" +TestCase: Alluxio dynamic changes mountpoints +DDC Engine: Alluxio +Steps: +1. create Dataset(WebUFS) & Runtime with two mountpoint +2. check if dataset is bound +3. check if persistentVolumeClaim & PV is created +4. check alluxioruntime mountpoint and data +5. change dataset mountpoint and update +6. check dataset is bound and mountpoint change +7. check if alluxio master recover after crash +8. clean up +""" + +import os +import sys +import time + +project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +sys.path.insert(0, project_root) + +import fluid.fluidapi as fluidapi +import fluid.step_funcs as funcs +from framework.testflow import TestFlow +from framework.step import SimpleStep, StatusCheckStep, dummy_back, currying_fn +from framework.exception import TestError + +from kubernetes import client, config +from kubernetes.stream import stream + +def checkAlluxioruntimeMountpoint(dataset_name, namespace, mp1, mp2): + exec_command = ["/bin/sh", + "-c", + "alluxio fs mount"] + resp = stream( + client.CoreV1Api().connect_get_namespaced_pod_exec, "{}-master-0".format(dataset_name), namespace, + command=exec_command, stderr=True, stdin=False, + stdout=True, tty=False, container='alluxio-master') + print("Response: " + resp) + if mp1 not in resp or mp2 not in resp: + print("checkAlluxioruntimeMountpoint Failed") + return False + + return True + +def change_dataset_mount_point(new_dataset, name, namespace): + client.CustomObjectsApi().patch_namespaced_custom_object( + name=name, + group="data.fluid.io", + version="v1alpha1", + namespace=namespace, + plural="datasets", + body=new_dataset, + ) + + print("new dataset patched") + +def check_dataset_mount_change(name, namespace): + resource = client.CustomObjectsApi().get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=name, + namespace=namespace, + plural="datasets" + ) + + # print(resource) + + if "status" in resource: + if "mounts" in resource["status"]: + print(resource["status"]["mounts"]) + if resource["status"]["mounts"][0]["name"] == "zookeeper" or resource["status"]["mounts"][1]["name"] == "zookeeper": + return True + + return False + +def checkRecoverAfterCrash(): + # exec the master pod and kill + exec_command = ["/bin/sh", + "-c", + "kill 1"] + resp = stream( + client.CoreV1Api().connect_get_namespaced_pod_exec, "hbase-master-0", "default", + command=exec_command, stderr=True, stdin=False, + stdout=True, tty=False, container='alluxio-master') + print("Response: " + resp) + + api = client.CoreV1Api() + time.sleep(1) + response = api.read_namespaced_pod( + name="hbase-master-0", namespace="default") + while response.status.phase != "Running": + time.sleep(1) + response = api.read_namespaced_pod( + name="hbase-master-0", namespace="default") + print(response) + + +def cleanUp(): + custom_api = client.CustomObjectsApi() + + custom_api.delete_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name="hbase", + namespace="default", + plural="datasets" + ) + + runtimeDelete = False + while not runtimeDelete: + print("runtime still exists...") + try: + runtime = custom_api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name="hbase", + namespace="default", + plural="alluxioruntimes" + ) + except client.exceptions.ApiException as e: + if e.status == 404: + runtimeDelete = True + continue + + time.sleep(1) + + +def main(): + if os.getenv("KUBERNETES_SERVICE_HOST") is None: + config.load_kube_config() + else: + config.load_incluster_config() + + name = "hbase-mount" + namespace = "default" + + mount = fluidapi.Mount() + mount.set_mount_info("hadoop", "https://mirrors.bit.edu.cn/apache/hadoop/common/stable/") + + dataset = fluidapi.assemble_dataset("alluxio-webufs") \ + .set_namespaced_name(namespace, name) \ + .add_mount(mount.dump()) + + runtime = fluidapi.assemble_runtime("alluxio-webufs") \ + .set_namespaced_name(namespace, name) + + flow = TestFlow("Alluxio - Test Dynamically Change Mountpoints") + + flow.append_step( + SimpleStep( + step_name="create dataset", + forth_fn=funcs.create_dataset_fn(dataset.dump()), + back_fn=funcs.delete_dataset_and_runtime_fn(runtime.dump(), name, namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create runtime", + forth_fn=funcs.create_runtime_fn(runtime.dump()), + back_fn=dummy_back + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if dataset is bound", + forth_fn=funcs.check_dataset_bound_fn(name, namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if PV & PVC is ready", + forth_fn=funcs.check_volume_resource_ready_fn(name, namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check alluxio runtime mount point", + forth_fn=currying_fn(checkAlluxioruntimeMountpoint, dataset_name=name, namespace=namespace, mp1="zookeeper", mp2="hadoop"), + timeout=10 + ) + ) + + new_mount = fluidapi.Mount() + new_mount.set_mount_info("hbase", "https://mirrors.bit.edu.cn/apache/hbase/stable/") + + + new_dataset = fluidapi.assemble_dataset("alluxio-webufs") \ + .set_namespaced_name(namespace, name) \ + .add_mount(new_mount.dump()) + + flow.append_step( + SimpleStep( + step_name="patch new mount point to dataset", + forth_fn=currying_fn(change_dataset_mount_point, new_dataset=new_dataset.dump(), name=name, namespace=namespace), + back_fn=dummy_back + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if mount point changed", + forth_fn=currying_fn(check_dataset_mount_change, name=name, namespace=namespace) + ) + ) + + try: + flow.run() + except Exception as e: + print(e) + exit(1) + + # createDatasetAndRuntime() + # checkDatasetBound() + # checkVolumeResourcesReady() + # res_check_mountpoint0 = checkAlluxioruntimeMountpoint("hbase", "hadoop") + # changeDatasetMountpoint() + # res_check_mountpoint1 = checkAlluxioruntimeMountpoint("hbase", "zookeeper") + # checkRecoverAfterCrash() + # cleanUp() + # if res_check_mountpoint0 == 1 or res_check_mountpoint1 == 1: + # exit(-1) + + +if __name__ == "__main__": + main() diff --git a/test/prow/testcases/alluxio/alluxio_webufs.py b/test/prow/testcases/alluxio/alluxio_webufs.py new file mode 100644 index 00000000000..ec5410879da --- /dev/null +++ b/test/prow/testcases/alluxio/alluxio_webufs.py @@ -0,0 +1,91 @@ +""" +TestCase: Access WebUFS data +DDC Engine: Alluxio +Steps: +1. create Dataset(WebUFS) & Runtime +2. check if dataset is bound +3. check if persistentVolumeClaim & PV is created +4. submit data read job +5. wait until data read job completes +6. clean up +""" +import os +import sys +import time + +project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +sys.path.insert(0, project_root) + +import fluid.fluidapi as fluidapi +import fluid.step_funcs as funcs +from framework.testflow import TestFlow +from framework.step import SimpleStep, StatusCheckStep, dummy_back +from framework.exception import TestError + +from kubernetes import client, config + +def main(): + if os.getenv("KUBERNETES_SERVICE_HOST") is None: + config.load_kube_config() + else: + config.load_incluster_config() + + name = "hbase" + namespace = "default" + + dataset = fluidapi.assemble_dataset("alluxio-webufs").set_namespaced_name(namespace, name) + runtime = fluidapi.assemble_runtime("alluxio-webufs").set_namespaced_name(namespace, name) + + flow = TestFlow("Alluxio - Access webufs data") + + flow.append_step( + SimpleStep( + step_name="create dataset", + forth_fn=funcs.create_dataset_fn(dataset.dump()), + back_fn=funcs.delete_dataset_and_runtime_fn(runtime.dump(), name, namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create runtime", + forth_fn=funcs.create_runtime_fn(runtime.dump()), + back_fn=dummy_back + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if dataset is bound", + forth_fn=funcs.check_dataset_bound_fn(name, namespace) + ) + ) + flow.append_step( + StatusCheckStep( + step_name="check if PV & PVC is ready", + forth_fn=funcs.check_volume_resource_ready_fn(name, namespace) + ) + ) + flow.append_step( + SimpleStep( + step_name="create data read job", + forth_fn=funcs.create_job_fn("time cp -r /data/zookeeper ./", name), + back_fn=funcs.delete_job_fn() + ) + ) + flow.append_step( + StatusCheckStep( + step_name="check if data read job success", + forth_fn=funcs.check_job_status_fn() + ) + ) + + try: + flow.run() + except Exception as e: + print(e) + exit(1) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/test/prow/testcases/alluxio/alluxio_webufs_set_resources.py b/test/prow/testcases/alluxio/alluxio_webufs_set_resources.py new file mode 100644 index 00000000000..638e33a1acc --- /dev/null +++ b/test/prow/testcases/alluxio/alluxio_webufs_set_resources.py @@ -0,0 +1,190 @@ +""" +TestCase: Resources setting for alluxio runtime +DDC Engine: Alluxio +Steps: +1. create Dataset(WebUFS) & Runtime with specified resource +2. check if dataset is bound +3. check if persistentVolumeClaim & PV is created +4. check if alluxio runtime resources are consistent with expected +5. clean up +""" + +import os +import sys + +project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +sys.path.insert(0, project_root) + +import fluid.fluidapi as fluidapi +import fluid.step_funcs as funcs +from framework.testflow import TestFlow +from framework.step import SimpleStep, StatusCheckStep, dummy_back + +from kubernetes import client, config + + +def check_alluxio_runtime_components_resource_fn(name, namespace="default"): + def check(): + api = client.CoreV1Api() + + response = api.read_namespaced_pod(name="{}-master-0".format(name), namespace=namespace) + master_resource_check = True + for container in response.spec.containers: + if container.name == "master": # master + if container.resources.limits["cpu"] == "2" and container.resources.limits["memory"] == "8Gi" and \ + container.resources.requests["cpu"] == "1" and container.resources.requests["memory"] == "4Gi": + continue + else: + master_resource_check = False + elif container.name == "jobmaster": # jobmaster + if container.resources.limits["cpu"] == "2" and container.resources.limits["memory"] == "8Gi" and \ + container.resources.requests["cpu"] == "1500m" and container.resources.requests["memory"] == "4Gi": + continue + else: + master_resource_check = False + + if master_resource_check: + print("Master Resource Check Pass") + + response = api.read_namespaced_pod(name="{}-worker-0".format(name), namespace=namespace) + worker_resource_check = True + for container in response.spec.containers: + if container.resources.limits["cpu"] == "2" and container.resources.limits["memory"] == "8Gi" and \ + container.resources.requests["cpu"] == "1" and container.resources.requests["memory"] == "4Gi": + continue + else: + worker_resource_check = False + + if worker_resource_check: + print("Worker Resource Check Pass") + + return master_resource_check and worker_resource_check + + return check + +def set_alluxio_runtime_resource(runtime): + runtime.resource["spec"]["master"] = { + "resources": { + "requests": { + "cpu": "1000m", + "memory": "4Gi" + }, + "limits": { + "cpu": "2000m", + "memory": "8Gi" + } + } + } + + runtime.resource["spec"]["jobMaster"] = { + "resources": { + "requests": { + "cpu": "1500m", + "memory": "4Gi" + }, + "limits": { + "cpu": "2000m", + "memory": "8Gi" + } + } + } + + runtime.resource["spec"]["worker"] = { + "resources": { + "requests": { + "cpu": "1000m", + "memory": "4Gi" + }, + "limits": { + "cpu": "2000m", + "memory": "8Gi" + } + } + } + + runtime.resource["spec"]["jobWorker"] = { + "resources": { + "requests": { + "cpu": "1000m", + "memory": "4Gi" + }, + "limits": { + "cpu": "2000m", + "memory": "8Gi" + } + } + } + + runtime.resource["spec"]["fuse"] = { + "resources": { + "requests": { + "cpu": "1000m", + "memory": "4Gi" + }, + "limits": { + "cpu": "2000m", + "memory": "8Gi" + } + } + } + +def main(): + if os.getenv("KUBERNETES_SERVICE_HOST") is None: + config.load_kube_config() + else: + config.load_incluster_config() + + name = "alluxio-resources" + namespace = "default" + + dataset = fluidapi.assemble_dataset("alluxio-webufs").set_namespaced_name(namespace, name) + runtime = fluidapi.assemble_runtime("alluxio-webufs").set_namespaced_name(namespace, name).set_tieredstore(mediumtype="MEM", path="/dev/shm", quota="4Gi") + set_alluxio_runtime_resource(runtime) + + flow = TestFlow("Alluxio - Set AlluxioRuntime resources") + + flow.append_step( + SimpleStep( + step_name="create dataset", + forth_fn=funcs.create_dataset_fn(dataset.dump()), + back_fn=funcs.delete_dataset_and_runtime_fn(runtime.dump(), name, namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create runtime", + forth_fn=funcs.create_runtime_fn(runtime.dump()), + back_fn=dummy_back + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if dataset is bound", + forth_fn=funcs.check_dataset_bound_fn(name, namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if PV & PVC is ready", + forth_fn=funcs.check_volume_resource_ready_fn(name, namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check runtime resources", + forth_fn=check_alluxio_runtime_components_resource_fn(name, namespace) + ) + ) + + try: + flow.run() + except Exception as e: + print(e) + exit(1) + +if __name__ == '__main__': + main() diff --git a/test/prow/testcases/common/csi_stale_patch.py b/test/prow/testcases/common/csi_stale_patch.py new file mode 100644 index 00000000000..69e0ac8f356 --- /dev/null +++ b/test/prow/testcases/common/csi_stale_patch.py @@ -0,0 +1,371 @@ +""" +TestCase: CSI Plugin Stale Node Patch Verification +DDC Engine: Alluxio +Steps: +1. create Dataset & Runtime +2. check if dataset is bound +3. create app pod +4. check app pod is running +5. add node label +6. delete app pod +7. create app pod again +8. check app pod is running +9. check added label exist +10. clean up +""" +import os +import sys +import time + +project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +sys.path.insert(0, project_root) + +from kubernetes import client, config + +from kubernetes.client.rest import ApiException + +import fluid.fluidapi as fluidapi +import fluid.step_funcs as funcs +from framework.testflow import TestFlow +from framework.step import SimpleStep, StatusCheckStep, SleepStep, dummy_back, currying_fn + +from kubernetes import client, config + + +def getNodes(): + api = client.CoreV1Api() + node_list = api.list_node() + return node_list + + +NS = "default" +def createDatasetAndRuntime(): + api = client.CustomObjectsApi() + my_dataset = { + "apiVersion": "data.fluid.io/v1alpha1", + "kind": "Dataset", + "metadata": {"name": "hbase", "namespace": NS}, + "spec": { + "mounts": [{"mountPoint": "https://mirrors.bit.edu.cn/apache/spark/", + "name": "hbase"}] + } + } + + my_alluxioruntime = { + "apiVersion": "data.fluid.io/v1alpha1", + "kind": "AlluxioRuntime", + "metadata": {"name": "hbase", "namespace": NS}, + "spec": { + "replicas": 1, + "tieredstore": { + "levels": [{ + "mediumtype": "MEM", + "path": "/dev/shm", + "quota": "2Gi", + "high": "0.95", + "low": "0.7" + }] + } + } + } + + api.create_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + namespace=NS, + plural="datasets", + body=my_dataset, + ) + + print("Created dataset.") + + api.create_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + namespace=NS, + plural="alluxioruntimes", + body=my_alluxioruntime + ) + + print("Created alluxioruntime.") + +def checkDatasetBound(): + api = client.CustomObjectsApi() + + while True: + resource = api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name="hbase", + namespace=NS, + plural="datasets" + ) + + if "status" in resource: + if "phase" in resource["status"]: + if resource["status"]["phase"] == "Bound": + break + print("Not bound.") + time.sleep(1) + # print(resource) + + +def createApp(node_name, dataset_name, namespace="default"): + api = client.CoreV1Api() + my_app = { + "apiVersion": "v1", + "kind": "Pod", + "metadata": {"name": "nginx", "namespace": namespace}, + "spec": { + "nodeName": node_name, + "containers": [ + { + "name": "nginx", + "image": "nginx", + "volumeMounts": [{"mountPath": "/data", "name": "hbase-vol"}] + } + ], + "volumes": [{ + "name": "hbase-vol", + "persistentVolumeClaim": { + "claimName": dataset_name + } + }] + } + } + api.create_namespaced_pod(NS, my_app) + print("Create pod.") + +# def checkAppRun(): +# api = client.CoreV1Api() +# while True: +# resource = api.read_namespaced_pod("nginx", NS) +# if (resource.status.phase == "Running"): +# print("App running.") +# print(resource.spec) +# return resource.spec.node_name +# print("App pod is not running.") +# time.sleep(1) + +def check_app_run(namespace="default"): + api = client.CoreV1Api() + resource = api.read_namespaced_pod("nginx", namespace) + if resource.status.phase == "Running": + print("Nginx App Running") + return True + + return False + +def addLabel(node_name): + api = client.CoreV1Api() + resource = api.read_node(node_name) + resource.metadata.labels['test-stale'] = 'true' + api.patch_node(node_name, resource) + print("Add node label.") + +def check_label(node_name): + api = client.CoreV1Api() + resource = api.read_node(node_name) + if (resource.metadata.labels['test-stale'] and resource.metadata.labels['test-stale'] == 'true'): + print("Added label exists.") + return True + else: + print("Added label does not exist.") + return False + +def delete_label(node_name): + api = client.CoreV1Api() + resource = api.read_node(node_name) + resource.metadata.labels['test-stale'] = None + api.patch_node(node_name, resource) + print("Deleted node label.") + +def deleteApp(namespace="default"): + api = client.CoreV1Api() + while True: + try: + api.delete_namespaced_pod("nginx", namespace) + resource = api.read_namespaced_pod("nginx", namespace) + print("App pod still exists...") + time.sleep(1) + except client.exceptions.ApiException as e: + if e.status == 404: + print("Delete pod.") + return + +def checkLabel(node_name): + api = client.CoreV1Api() + resource = api.read_node(node_name) + if (resource.metadata.labels['test-stale'] and resource.metadata.labels['test-stale'] == 'true'): + print("Added label exists.") + return True + else: + print("Added label does not exist.") + return False + +def cleanUp(node_name): + deleteApp() + + custom_api = client.CustomObjectsApi() + custom_api.delete_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name="hbase", + namespace=NS, + plural="datasets" + ) + + runtimeDelete = False + while not runtimeDelete: + print("runtime still exists...") + try: + runtime = custom_api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name="hbase", + namespace=NS, + plural="alluxioruntimes" + ) + except client.exceptions.ApiException as e: + if e.status == 404: + runtimeDelete = True + continue + + time.sleep(1) + + api = client.CoreV1Api() + resource = api.read_node(node_name) + resource.metadata.labels['test-stale'] = None + api.patch_node(node_name, resource) + + print("Delete added label.") + + +def main(): + if os.getenv("KUBERNETES_SERVICE_HOST") is None: + config.load_kube_config() + else: + config.load_incluster_config() + + nodes = getNodes() + if len(nodes.items) == 0: + return 1 + node = nodes.items[0] + + name = "stale-info-check" + namespace = "default" + + dataset = fluidapi.assemble_dataset("alluxio-webufs") \ + .set_namespaced_name(namespace, name) \ + + runtime = fluidapi.assemble_runtime("alluxio-webufs") \ + .set_namespaced_name(namespace, name) \ + .set_tieredstore(mediumtype="MEM", path="/dev/shm", quota="2Gi") + + + flow = TestFlow("Common - Test Stale Node Info after CSI patch") + + flow.append_step( + SimpleStep( + step_name="create dataset", + forth_fn=funcs.create_dataset_fn(dataset.dump()), + back_fn=funcs.delete_dataset_and_runtime_fn(runtime.dump(), name, namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create runtime", + forth_fn=funcs.create_runtime_fn(runtime.dump()), + back_fn=dummy_back + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check dataset bound", + forth_fn=funcs.check_dataset_bound_fn(name, namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create data read pod", + forth_fn=currying_fn(createApp, node_name=node.metadata.name, dataset_name=name, namespace=namespace), + back_fn=currying_fn(deleteApp, namespace=namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if data read pod running", + forth_fn=currying_fn(check_app_run, namespace=namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="patch label on node", + forth_fn=currying_fn(addLabel, node_name=node.metadata.name), + back_fn=currying_fn(delete_label, node_name=node.metadata.name) + ) + ) + + flow.append_step( + SimpleStep( + step_name="recreate data read pod[deleting]", + forth_fn=currying_fn(deleteApp, namespace=namespace), + back_fn=dummy_back + ) + ) + + flow.append_step( + SimpleStep( + step_name="recreate data read pod[creating]", + forth_fn=currying_fn(createApp, node_name=node.metadata.name, dataset_name=name, namespace=namespace), + back_fn=currying_fn(deleteApp, namespace=namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if data read app running", + forth_fn=currying_fn(check_app_run, namespace=namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if label still exists", + forth_fn=currying_fn(check_label, node_name=node.metadata.name), + timeout=20 + ) + ) + + try: + flow.run() + except Exception as e: + print(e) + exit(1) + + + + + # createDatasetAndRuntime() + # checkDatasetBound() + # createApp() + # node_name = checkAppRun() + # addLabel(node_name) + # deleteApp() + # createApp() + # checkAppRun() + # res = checkLabel(node_name) + # cleanUp(node_name) + # print("Has passed? " + str(True)) + # if not res: + # exit(-1) + # return 0 + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/test/prow/testcases/common/fuse_recovery.py b/test/prow/testcases/common/fuse_recovery.py new file mode 100644 index 00000000000..c706460342d --- /dev/null +++ b/test/prow/testcases/common/fuse_recovery.py @@ -0,0 +1,222 @@ +""" +TestCase: Recover Fuse +DDC Engine: Alluxio +Steps: +1. check if Fuse Recover is Enabled +2. create Dataset(WebUFS) & Runtime +3. check if dataset is bound +4. check if persistentVolumeClaim & PV is created +5. create data list Pod with Injection label +6. wait until Pod running & check if data list succeed +7. delete Alluxio Fuse Pod +8. check Fuse recovered +9. check if data list succeed +10. clean up +""" +import os +import sys +import time + +project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +sys.path.insert(0, project_root) + +from kubernetes import client, config + +from kubernetes.client.rest import ApiException + +import fluid.fluidapi as fluidapi +import fluid.step_funcs as funcs +from framework.testflow import TestFlow +from framework.step import SimpleStep, StatusCheckStep, SleepStep, dummy_back, currying_fn + +from kubernetes import client, config + +def getPodNameByPrefix(prefix, pod_namespace): + api = client.CoreV1Api() + pods = api.list_namespaced_pod(pod_namespace) + pods_name = [item.metadata.name for item in pods.items] + for name in pods_name: + if name.__contains__(prefix): + pod_name = name + return pod_name + return None + + +def checkCsiRecoverEnabled() -> bool: + """ + check if csi-nodeplugin-fluid-xxxx pod.spec.containers has args "FuseRecovery=true" + """ + fluid_namespace = "fluid-system" + pod_name = "csi-nodeplugin-fluid" + pod_name = getPodNameByPrefix(pod_name, fluid_namespace) + if pod_name is None: + return False + api = client.CoreV1Api() + for i in range(10): + pod = api.read_namespaced_pod(pod_name, fluid_namespace) + if str(pod.spec.containers).__contains__("FuseRecovery=true"): + print("CSI recovery enabled") + return True + time.sleep(1) + return False + + +def checkFuseRecovered(dataset_name, namespace="default"): + def getFuseRecoveredUids(namespace="default"): + api = client.CoreV1Api() + items = api.list_namespaced_event(namespace=namespace).items + fuseRecoveryUids = set() + for item in items: + if item.message.__contains__("Fuse recover"): + fuseRecoveryUids.add(item.involved_object.uid) + return fuseRecoveryUids + + ### get dataset hbase uid + api = client.CustomObjectsApi() + dataset = api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + namespace=namespace, + plural="datasets", + name=dataset_name) + uid = dataset['metadata']['uid'] + print("Dataset uid is: {}".format(uid)) + uids = getFuseRecoveredUids(namespace) + # print("Total uids are: {}".format(uids)) + if uids.__contains__(uid): + print("Fuse Recovered.") + return True + + return False + + +def checkDataListSuccess(pod_name, namespace="default") -> bool: + cmd = "kubectl -n {} exec -it {} ls /data/zookeeper".format(namespace, pod_name) + success = os.system(cmd) + if success == 0: + print("Data Read done.") + return True + else: + print("Data Read Fail") + return False + + +def deleteAlluxioFusePod(dataset_name, namespace="default"): + def deletePodWithPrefix(prefix, pod_namespace): + pod_name = getPodNameByPrefix(prefix, pod_namespace) + api = client.CoreV1Api() + api.delete_namespaced_pod(pod_name, pod_namespace) + time.sleep(5) + print("Delete pod: {}".format(pod_name)) + + deletePodWithPrefix("{}-fuse".format(dataset_name), namespace) + time.sleep(30) + + +def main(): + if os.getenv("KUBERNETES_SERVICE_HOST") is None: + config.load_kube_config() + else: + config.load_incluster_config() + + + if checkCsiRecoverEnabled() is False: + print("FAIL at checkCsiRecoverEnabled(): FUSE Recover feature gate is not enabled") + return 1 + + name = "test-fuse-recover" + namespace = "default" + + dataset = fluidapi.assemble_dataset("alluxio-webufs") \ + .set_namespaced_name(namespace, name) + + runtime = fluidapi.assemble_runtime("alluxio-webufs") \ + .set_namespaced_name(namespace, name) \ + + flow = TestFlow("Common - Test FUSE Recover") + + flow.append_step( + SimpleStep( + step_name="create dataset", + forth_fn=funcs.create_dataset_fn(dataset.dump()), + back_fn=funcs.delete_dataset_and_runtime_fn(runtime.dump(), name, namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create runtime", + forth_fn=funcs.create_runtime_fn(runtime.dump()), + back_fn=dummy_back + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check dataset bound", + forth_fn=funcs.check_dataset_bound_fn(name, namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if PV & PVC is ready", + forth_fn=funcs.check_volume_resource_ready_fn(name, namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create pod with fluid pvc", + forth_fn=funcs.create_pod_fn(dataset_name=name, name="nginx-test", namespace=namespace, serverful=True), + back_fn=funcs.delete_pod_fn(name="nginx-test", namespace=namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check pod status", + forth_fn=funcs.check_pod_running_fn(name="nginx-test", namespace=namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="touch FUSE mountpoint", + forth_fn=currying_fn(checkDataListSuccess, pod_name="nginx-test", namespace=namespace), + timeout=5 + ) + ) + + flow.append_step( + SimpleStep( + step_name="delete fuse pod", + forth_fn=currying_fn(deleteAlluxioFusePod, dataset_name=name, namespace=namespace), + back_fn=dummy_back + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if fuse mountpoint is recovered", + forth_fn=currying_fn(checkFuseRecovered, dataset_name=name, namespace=namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="touch FUSE mountpoint", + forth_fn=currying_fn(checkDataListSuccess, pod_name="nginx-test", namespace=namespace), + timeout=5 + ) + ) + + try: + flow.run() + except Exception as e: + print(e) + exit(1) + + +if __name__ == '__main__': + main() diff --git a/test/prow/testcases/common/patch_node_label_parallel.py b/test/prow/testcases/common/patch_node_label_parallel.py new file mode 100644 index 00000000000..8f518d47569 --- /dev/null +++ b/test/prow/testcases/common/patch_node_label_parallel.py @@ -0,0 +1,241 @@ +""" +TestCase: Patch Node Label Parallel +DDC Engine: Alluxio +Steps: +1. patch label +2. create dataset and runtime demo1 and demo2 +3. check label +4. delete dataset demo1 and create dataset demo3 +5. check label +6. clean up +""" + +import os +import sys + +project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +sys.path.insert(0, project_root) + +from kubernetes import client, config + +from kubernetes.client.rest import ApiException + +import fluid.fluidapi as fluidapi +import fluid.step_funcs as funcs +from framework.testflow import TestFlow +from framework.step import SimpleStep, StatusCheckStep, SleepStep, dummy_back, currying_fn + +from kubernetes import client, config + + +def getNodes(): + api = client.CoreV1Api() + node_list = api.list_node() + return node_list + + +def patchNodeLabel(key, value, node): + api = client.CoreV1Api() + + body = { + "metadata": { + "labels": { + key: value, + } + } + } + + # Patching the node labels + api_response = api.patch_node(node.metadata.name, body) + print("node label: %s\t%s" % (node.metadata.name, node.metadata.labels)) + +def checkLabel(datasets, node): + api = client.CoreV1Api() + try: + latestNode = api.read_node(node.metadata.name) + # print(latestNode) + except ApiException as e: + print("Exception when calling CoreV1Api->read_node: %s\n" % e) + return False + labels = latestNode.metadata.labels + + dataNumKey = "fluid.io/dataset-num" + alluxioKeyPrefix = "fluid.io/s-alluxio-default-" + datasetKeyPrefix = "fluid.io/s-default-" + # check dataset number label + # print(labels) + if dataNumKey not in labels or labels[dataNumKey] != str(len(datasets)): + return False + # check alluxio label + for dataset in datasets: + alluxioKey = alluxioKeyPrefix + dataset + datasetKey = datasetKeyPrefix + dataset + if alluxioKey not in labels or labels[alluxioKey] != "true": + return False + if datasetKey not in labels or labels[datasetKey] != "true": + return False + return True + + +def main(): + if os.getenv("KUBERNETES_SERVICE_HOST") is None: + config.load_kube_config() + else: + config.load_incluster_config() + + # 1. patch label + nodes = getNodes() + if len(nodes.items) == 0: + return 1 + node = nodes.items[0] + + namespace = "default" + + dataset1 = fluidapi.assemble_dataset("alluxio-webufs") \ + .set_namespaced_name(namespace, "demo1") \ + .set_node_affinity("fluid", "multi-dataset") \ + .set_placement("Shared") + + runtime1 = fluidapi.assemble_runtime("alluxio-webufs") \ + .set_namespaced_name(namespace, "demo1") \ + .set_tieredstore(mediumtype="MEM", path="/dev/shm", quota="4Gi") + + dataset2 = fluidapi.assemble_dataset("alluxio-webufs") \ + .set_namespaced_name(namespace, "demo2") \ + .set_node_affinity("fluid", "multi-dataset") \ + .set_placement("Shared") + runtime2 = fluidapi.assemble_runtime("alluxio-webufs") \ + .set_namespaced_name(namespace, "demo2") \ + .set_tieredstore(mediumtype="MEM", path="/dev/shm", quota="4Gi") + + dataset3 = fluidapi.assemble_dataset("alluxio-webufs") \ + .set_namespaced_name(namespace, "demo3") \ + .set_node_affinity("fluid", "multi-dataset") \ + .set_placement("Shared") + runtime3 = fluidapi.assemble_runtime("alluxio-webufs") \ + .set_namespaced_name(namespace, "demo3") \ + .set_tieredstore(mediumtype="MEM", path="/dev/shm", quota="4Gi") + + flow = TestFlow("Common - Patch Node Label in Parallel") + + flow.append_step( + SimpleStep( + step_name="patch node label", + forth_fn=currying_fn(patchNodeLabel, key="fluid", value="multi-dataset", node=node), + back_fn=currying_fn(patchNodeLabel, key="fluid", value=None, node=node) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create dataset demo1", + forth_fn=funcs.create_dataset_fn(dataset1.dump()), + back_fn=funcs.delete_dataset_and_runtime_fn(runtime1.dump(), "demo1", namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create runtime demo1", + forth_fn=funcs.create_runtime_fn(runtime1.dump()), + back_fn=dummy_back + ) + ) + + flow.append_step( + SimpleStep( + step_name="create dataset demo2", + forth_fn=funcs.create_dataset_fn(dataset2.dump()), + back_fn=funcs.delete_dataset_and_runtime_fn(runtime2.dump(), "demo2", namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create runtime demo2", + forth_fn=funcs.create_runtime_fn(runtime2.dump()), + back_fn=dummy_back + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check dataset demo1 bound", + forth_fn=funcs.check_dataset_bound_fn("demo1", namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check dataset demo2 bound", + forth_fn=funcs.check_dataset_bound_fn("demo2", namespace) + ) + ) + + flow.append_step( + SleepStep( + sleep_seconds=20, + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check node label", + forth_fn=currying_fn(checkLabel, datasets=["demo1", "demo2"], node=node) + ) + ) + + flow.append_step( + SimpleStep( + step_name="clean up dataset demo1", + forth_fn=funcs.delete_dataset_and_runtime_fn(runtime1.dump(), "demo1", namespace), + back_fn=dummy_back + ) + ) + + flow.append_step( + SimpleStep( + step_name="create dataset demo3", + forth_fn=funcs.create_dataset_fn(dataset3.dump()), + back_fn=funcs.delete_dataset_and_runtime_fn(runtime3.dump(), "demo3", namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create runtime demo3", + forth_fn=funcs.create_runtime_fn(runtime3.dump()), + back_fn=dummy_back + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check dataset demo3 bound", + forth_fn=funcs.check_dataset_bound_fn("demo3", namespace) + ) + ) + + flow.append_step( + SleepStep( + sleep_seconds=20 + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check node label", + forth_fn=currying_fn(checkLabel, datasets=["demo2", "demo3"], node=node) + ) + ) + + try: + flow.run() + except Exception as e: + print(e) + exit(1) + + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/test/prow/testcases/jindofsx/jindo_oss.py b/test/prow/testcases/jindofsx/jindo_oss.py new file mode 100644 index 00000000000..fb92e155c5f --- /dev/null +++ b/test/prow/testcases/jindofsx/jindo_oss.py @@ -0,0 +1,123 @@ +""" +TestCase: Access OSS data after cache warmup +DDC Engine: Jindofsx +Steps: +1. create Dataset & Runtime +2. check if dataset is bound +3. check if persistentVolumeClaim & PV is created +4. submit DataLoad CR +5. wait until DataLoad completes +6. check if dataset cached usage equals to ufs total file size (i.e. Fully cached) +7. clean up +""" + +import os +import sys + +project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +sys.path.insert(0, project_root) + +import fluid.fluidapi as fluidapi +import fluid.step_funcs as funcs +from framework.testflow import TestFlow +from framework.step import SimpleStep, StatusCheckStep, dummy_back + + +from kubernetes import client, config + +def main(): + if os.getenv("KUBERNETES_SERVICE_HOST") is None: + config.load_kube_config() + else: + config.load_incluster_config() + + name = "demo-dataset" + namespace = "default" + + dataload_name ="demo-dataset-warmup" + + dataset = fluidapi.assemble_dataset("jindo-oss").set_namespaced_name(namespace, name) + runtime = fluidapi.assemble_runtime("jindo-oss").set_namespaced_name(namespace, name) + dataload = fluidapi.DataLoad(dataload_name, namespace) \ + .set_target_dataset(name, namespace) \ + .set_load_metadata(True) + + flow = TestFlow("JindoFS - Access OSS data") + + flow.append_step( + SimpleStep( + step_name="create dataset", + forth_fn=funcs.create_dataset_fn(dataset.dump()), + back_fn=funcs.delete_dataset_and_runtime_fn(runtime.dump(), name, namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create runtime", + forth_fn=funcs.create_runtime_fn(runtime.dump()), + back_fn=dummy_back + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if dataset is bound", + forth_fn=funcs.check_dataset_bound_fn(name, namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if PV & PVC is ready", + forth_fn=funcs.check_volume_resource_ready_fn(name, namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create dataload", + forth_fn=funcs.create_dataload_fn(dataload.dump()), + back_fn=dummy_back, # DataLoad should have ownerReference of Dataset + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if dataload job completes", + forth_fn=funcs.check_dataload_job_status_fn(dataload_name, namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if the whole dataset is warmed up", + forth_fn=funcs.check_dataset_cached_percentage_fn(name, namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create data read job", + forth_fn=funcs.create_job_fn(script="time cp -r /data/ /tmp-data", dataset_name=name, namespace=namespace), + back_fn=funcs.delete_job_fn() + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check data read job status", + forth_fn=funcs.check_job_status_fn() + ) + ) + + try: + flow.run() + except Exception as e: + print(e) + exit(1) + + + +if __name__ == '__main__': + main() diff --git a/test/prow/testcases/juicefs/juicefs_minio.py b/test/prow/testcases/juicefs/juicefs_minio.py new file mode 100644 index 00000000000..369e66ee7a7 --- /dev/null +++ b/test/prow/testcases/juicefs/juicefs_minio.py @@ -0,0 +1,477 @@ +# Copyright 2022 The Fluid Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +TestCase: Pod accesses Juicefs data +DDC Engine: Juicefs(Community) with local redis and minio + +Prerequisite: +1. apply minio service and deployment: +``` +apiVersion: v1 +kind: Service +metadata: + name: minio +spec: + type: ClusterIP + ports: + - port: 9000 + targetPort: 9000 + protocol: TCP + selector: + app: minio +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + # This name uniquely identifies the Deployment + name: minio +spec: + selector: + matchLabels: + app: minio + strategy: + type: Recreate + template: + metadata: + labels: + # Label is used as selector in the service. + app: minio + spec: + containers: + - name: minio + # Pulls the default Minio image from Docker Hub + image: minio/minio + args: + - server + - /data + env: + # Minio access key and secret key + - name: MINIO_ROOT_USER + value: "minioadmin" + - name: MINIO_ROOT_PASSWORD + value: "minioadmin" + ports: + - containerPort: 9000 + hostPort: 9000 +``` +2. apply redis service and deployment: +``` +apiVersion: v1 +kind: Service +metadata: + name: redis +spec: + type: ClusterIP + ports: + - port: 6379 + targetPort: 6379 + protocol: TCP + selector: + app: redis +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + # This name uniquely identifies the Deployment + name: redis +spec: + selector: + matchLabels: + app: redis + strategy: + type: Recreate + template: + metadata: + labels: + # Label is used as selector in the service. + app: redis + spec: + containers: + - name: redis + # Pulls the default Redis image from Docker Hub + image: redis + ports: + - containerPort: 6379 + hostPort: 6379 +``` + +Steps: +1. create Dataset & Runtime +2. check if dataset is bound +3. check if PVC & PV is created +4. submit data write job +5. wait until data write job completes +6. submit data read job +7. check if data content consistent +8. clean up +""" + +import os +import sys +import time + +project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +sys.path.insert(0, project_root) + +import fluid.fluidapi as fluidapi +import fluid.step_funcs as funcs +from framework.testflow import TestFlow +from framework.step import SimpleStep, StatusCheckStep, dummy_back, currying_fn +from framework.exception import TestError + +from kubernetes import client, config + +NODE_IP = "minio" +APP_NAMESPACE = "default" +SECRET_NAME = "jfs-secret" + +NODE_NAME = "" + + +def create_redis_secret(namespace="default"): + api = client.CoreV1Api() + jfs_secret = { + "apiVersion": "v1", + "kind": "Secret", + "metadata": {"name": SECRET_NAME}, + "stringData": {"metaurl": "redis://redis:6379/0", "accesskey": "minioadmin", "secretkey": "minioadmin"} + } + + api.create_namespaced_secret(namespace=namespace, body=jfs_secret) + print("Created secret {}".format(SECRET_NAME)) + + +def create_datamigrate(datamigrate_name, dataset_name, namespace="default"): + api = client.CustomObjectsApi() + my_datamigrate = { + "apiVersion": "data.fluid.io/v1alpha1", + "kind": "DataMigrate", + "metadata": {"name": datamigrate_name, "namespace": namespace}, + "spec": { + "image": "registry.cn-hangzhou.aliyuncs.com/juicefs/juicefs-fuse", + "imageTag": "nightly", + "from": { + "dataset": {"name": dataset_name, "namespace": namespace} + }, + "to": {"externalStorage": { + "uri": "minio://%s:9000/minio/test/" % NODE_IP, + "encryptOptions": [ + {"name": "access-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "accesskey"}}}, + {"name": "secret-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "secretkey"}}}, + ] + }} + }, + } + + api.create_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + namespace="default", + plural="datamigrates", + body=my_datamigrate, + ) + print("Create datamigrate {}".format(datamigrate_name)) + + +def check_datamigrate_complete(datamigrate_name, namespace="default"): + api = client.CustomObjectsApi() + + resource = api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=datamigrate_name, + namespace=namespace, + plural="datamigrates" + ) + + if "status" in resource: + if "phase" in resource["status"]: + if resource["status"]["phase"] == "Complete": + print("Datamigrate {} is complete.".format(datamigrate_name)) + return True + + return False + + +def get_worker_node_fn(dataset_name, namespace="default"): + def check_internal(): + api = client.CoreV1Api() + pod_name = "{}-worker-0".format(dataset_name) + try: + global NODE_NAME + pod = api.read_namespaced_pod(name=pod_name, namespace=namespace) + NODE_NAME = pod.spec.node_name + return True + except client.exceptions.ApiException as e: + if e.status == 404: + return False + raise e + + return funcs.check(check_internal, retries=60, interval=1) + + +def create_check_cache_job(job_name, node_name, namespace="default"): + if node_name == "": + raise TestError("cannot check cache cleaned up given an empty node") + + print("Create check cache job") + api = client.BatchV1Api() + + container = client.V1Container( + name="demo", + image="debian:buster", + command=["/bin/bash"], + args=["-c", "if [ $(find /dev/shm/* | grep chunks | wc -l) = 0 ]; then exit 0; else exit 1; fi"], + volume_mounts=[client.V1VolumeMount(mount_path="/dev/shm/cache1", name="cache1"), + client.V1VolumeMount(mount_path="/dev/shm/cache2", name="cache2")] + ) + + template = client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta(labels={"app": "checkcache"}), + spec=client.V1PodSpec( + restart_policy="Never", + containers=[container], + volumes=[ + client.V1Volume( + name="cache1", + host_path=client.V1HostPathVolumeSource(path="/dev/shm/cache1") + ), + client.V1Volume( + name="cache2", + host_path=client.V1HostPathVolumeSource(path="/dev/shm/cache2") + ) + ], + node_name=node_name, + ) + ) + + spec = client.V1JobSpec(template=template, backoff_limit=4) + + job = client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=client.V1ObjectMeta(name=job_name, namespace=namespace), + spec=spec + ) + + api.create_namespaced_job(namespace=namespace, body=job) + print("Job {} created.".format("checkcache")) + + +def check_data_job_status(job_name, namespace="default"): + api = client.BatchV1Api() + + count = 0 + while count < 300: + count += 1 + response = api.read_namespaced_job_status(name=job_name, namespace=namespace) + if response.status.succeeded is not None: + print("Job {} completed.".format(job_name)) + return True + if response.status.failed is not None: + print("Job {} failed.".format(job_name)) + return False + time.sleep(1) + print("Job {} not completed within 300s.".format(job_name)) + return False + + +def clean_job(job_name, namespace="default"): + batch_api = client.BatchV1Api() + + # See https://github.com/kubernetes-client/python/issues/234 + body = client.V1DeleteOptions(propagation_policy='Background') + try: + batch_api.delete_namespaced_job(name=job_name, namespace=namespace, body=body) + except client.exceptions.ApiException as e: + if e.status == 404: + print("job {} deleted".format(job_name)) + return True + + count = 0 + while count < 300: + count += 1 + print("job {} still exists...".format(job_name)) + try: + batch_api.read_namespaced_job(name=job_name, namespace=namespace) + except client.exceptions.ApiException as e: + if e.status == 404: + print("job {} deleted".format(job_name)) + return True + time.sleep(1) + + print("job {} not deleted within 300s".format(job_name)) + return False + + +def clean_up_datamigrate(datamigrate_name, namespace): + custom_api = client.CustomObjectsApi() + custom_api.delete_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=datamigrate_name, + namespace=namespace, + plural="datamigrates" + ) + print("Datamigrate {} deleted".format(datamigrate_name)) + + +def clean_up_secret(namespace="default"): + core_api = client.CoreV1Api() + try: + core_api.delete_namespaced_secret(name=SECRET_NAME, namespace=namespace) + except client.ApiException as e: + if e.status != 404: + raise e + print("secret {} is cleaned up".format(SECRET_NAME)) + + +def main(): + if os.getenv("KUBERNETES_SERVICE_HOST") is None: + config.load_kube_config() + else: + config.load_incluster_config() + + name = "jfsdemo" + datamigrate_name = "jfsdemo" + namespace = "default" + test_write_job = "demo-write" + test_read_job = "demo-read" + check_cache_job = "checkcache" + + dataset = fluidapi.assemble_dataset("juicefs-minio") \ + .set_namespaced_name(namespace, name) + runtime = fluidapi.assemble_runtime("juicefs-minio") \ + .set_namespaced_name(namespace, name) + + flow = TestFlow("JuiceFS - Access Minio data") + + flow.append_step( + SimpleStep( + step_name="create jfs secrets", + forth_fn=currying_fn(create_redis_secret, namespace=namespace), + back_fn=currying_fn(clean_up_secret, namespace=namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create dataset", + forth_fn=funcs.create_dataset_fn(dataset.dump()), + back_fn=funcs.delete_dataset_and_runtime_fn(runtime.dump(), name=name, namespace=namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create runtime", + forth_fn=funcs.create_runtime_fn(runtime.dump()), + back_fn=dummy_back + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if dataset is bound", + forth_fn=funcs.check_dataset_bound_fn(name, namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if PV & PVC is ready", + forth_fn=funcs.check_volume_resource_ready_fn(name, namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="get juicefs worker node info", + forth_fn=get_worker_node_fn(dataset_name=name, namespace=namespace), + back_fn=dummy_back + ) + ) + + flow.append_step( + SimpleStep( + step_name="create data write job", + forth_fn=funcs.create_job_fn(script="dd if=/dev/zero of=/data/allzero.file bs=100M count=10 && sha256sum /data/allzero.file", dataset_name=name, name=test_write_job, namespace=namespace), + back_fn=funcs.delete_job_fn(name=test_write_job, namespace=namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check data write job status", + forth_fn=funcs.check_job_status_fn(name=test_write_job, namespace=namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create data read job", + forth_fn=funcs.create_job_fn(script="time sha256sum /data/allzero.file && rm /data/allzero.file", dataset_name=name, name=test_read_job, namespace=namespace), + back_fn=funcs.delete_job_fn(name=test_read_job, namespace=namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check data read job status", + forth_fn=funcs.check_job_status_fn(name=test_read_job, namespace=namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create DataMigrate job", + forth_fn=currying_fn(create_datamigrate, datamigrate_name=datamigrate_name, dataset_name=name, namespace=namespace), + back_fn=dummy_back + # No need to clean up DataMigrate because of its ownerReference + # back_fn=currying_fn(clean_up_datamigrate, datamigrate_name=datamigrate_name, namespace=namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if DataMigrate succeeds", + forth_fn=currying_fn(check_datamigrate_complete, datamigrate_name=datamigrate_name, namespace=namespace) + ) + ) + + try: + flow.run() + except Exception as e: + print(e) + exit(1) + + + print("> Post-Check: Cache cleaned up?") + try: + assert(NODE_NAME != "") + create_check_cache_job(job_name=check_cache_job, node_name=NODE_NAME, namespace=namespace) + if not check_data_job_status(check_cache_job, namespace=namespace): + raise Exception("> FAIL: Job {} in normal mode failed.".format("checkcache")) + except Exception as e: + print(e) + exit(1) + finally: + # clean up check cache job + clean_job(check_cache_job, namespace=namespace) + print("> Post-Check: PASSED") + + +if __name__ == '__main__': + main() diff --git a/test/prow/testcases/juicefs/juicefs_minio_sidecar.py b/test/prow/testcases/juicefs/juicefs_minio_sidecar.py new file mode 100644 index 00000000000..bd510df3508 --- /dev/null +++ b/test/prow/testcases/juicefs/juicefs_minio_sidecar.py @@ -0,0 +1,342 @@ +# Copyright 2022 The Fluid Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +TestCase: Pod accesses Juicefs data +DDC Engine: Juicefs(Community) with local redis and minio + +Prerequisite: +1. apply minio service and deployment: +``` +apiVersion: v1 +kind: Service +metadata: + name: minio +spec: + type: ClusterIP + ports: + - port: 9000 + targetPort: 9000 + protocol: TCP + selector: + app: minio +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + # This name uniquely identifies the Deployment + name: minio +spec: + selector: + matchLabels: + app: minio + strategy: + type: Recreate + template: + metadata: + labels: + # Label is used as selector in the service. + app: minio + spec: + containers: + - name: minio + # Pulls the default Minio image from Docker Hub + image: minio/minio + args: + - server + - /data + env: + # Minio access key and secret key + - name: MINIO_ROOT_USER + value: "minioadmin" + - name: MINIO_ROOT_PASSWORD + value: "minioadmin" + ports: + - containerPort: 9000 + hostPort: 9000 +``` +2. apply redis service and deployment: +``` +apiVersion: v1 +kind: Service +metadata: + name: redis +spec: + type: ClusterIP + ports: + - port: 6379 + targetPort: 6379 + protocol: TCP + selector: + app: redis +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + # This name uniquely identifies the Deployment + name: redis +spec: + selector: + matchLabels: + app: redis + strategy: + type: Recreate + template: + metadata: + labels: + # Label is used as selector in the service. + app: redis + spec: + containers: + - name: redis + # Pulls the default Redis image from Docker Hub + image: redis + ports: + - containerPort: 6379 + hostPort: 6379 +``` + +Steps: +1. create Dataset & Runtime +2. check if dataset is bound +3. check if PVC & PV is created +4. submit data write job +5. wait until data write job completes +6. submit data read job +7. check if data content consistent +8. clean up +""" + +import os +import sys +import time + +project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) +sys.path.insert(0, project_root) + +import fluid.fluidapi as fluidapi +import fluid.step_funcs as funcs +from framework.testflow import TestFlow +from framework.step import SimpleStep, StatusCheckStep, dummy_back, currying_fn +from framework.exception import TestError + +from kubernetes import client, config + +NODE_IP = "minio" +APP_NAMESPACE = "default" +SECRET_NAME = "jfs-secret" + +NODE_NAME = "" + + +def create_redis_secret(namespace="default"): + api = client.CoreV1Api() + jfs_secret = { + "apiVersion": "v1", + "kind": "Secret", + "metadata": {"name": SECRET_NAME}, + "stringData": {"metaurl": "redis://redis:6379/0", "accesskey": "minioadmin", "secretkey": "minioadmin"} + } + + api.create_namespaced_secret(namespace=namespace, body=jfs_secret) + print("Created secret {}".format(SECRET_NAME)) + + +def create_datamigrate(datamigrate_name, dataset_name, namespace="default"): + api = client.CustomObjectsApi() + my_datamigrate = { + "apiVersion": "data.fluid.io/v1alpha1", + "kind": "DataMigrate", + "metadata": {"name": datamigrate_name, "namespace": namespace}, + "spec": { + "image": "registry.cn-hangzhou.aliyuncs.com/juicefs/juicefs-fuse", + "imageTag": "nightly", + "from": { + "dataset": {"name": dataset_name, "namespace": namespace} + }, + "to": {"externalStorage": { + "uri": "minio://%s:9000/minio/test/" % NODE_IP, + "encryptOptions": [ + {"name": "access-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "accesskey"}}}, + {"name": "secret-key", "valueFrom": {"secretKeyRef": {"name": SECRET_NAME, "key": "secretkey"}}}, + ] + }} + }, + } + + api.create_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + namespace="default", + plural="datamigrates", + body=my_datamigrate, + ) + print("Create datamigrate {}".format(datamigrate_name)) + + +def check_datamigrate_complete(datamigrate_name, namespace="default"): + api = client.CustomObjectsApi() + + resource = api.get_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=datamigrate_name, + namespace=namespace, + plural="datamigrates" + ) + + if "status" in resource: + if "phase" in resource["status"]: + if resource["status"]["phase"] == "Complete": + print("Datamigrate {} is complete.".format(datamigrate_name)) + return True + + return False + + +def clean_up_datamigrate(datamigrate_name, namespace): + custom_api = client.CustomObjectsApi() + custom_api.delete_namespaced_custom_object( + group="data.fluid.io", + version="v1alpha1", + name=datamigrate_name, + namespace=namespace, + plural="datamigrates" + ) + print("Datamigrate {} deleted".format(datamigrate_name)) + + +def clean_up_secret(namespace="default"): + core_api = client.CoreV1Api() + try: + core_api.delete_namespaced_secret(name=SECRET_NAME, namespace=namespace) + except client.ApiException as e: + if e.status != 404: + raise e + print("secret {} is cleaned up".format(SECRET_NAME)) + + +def main(): + if os.getenv("KUBERNETES_SERVICE_HOST") is None: + config.load_kube_config() + else: + config.load_incluster_config() + + name = "jfsdemo-sidecar" + datamigrate_name = "jfsdemo" + namespace = "default" + test_write_job = "demo-write-sidecar" + test_read_job = "demo-read-sidecar" + + dataset = fluidapi.assemble_dataset("juicefs-minio") \ + .set_namespaced_name(namespace, name) + runtime = fluidapi.assemble_runtime("juicefs-minio") \ + .set_namespaced_name(namespace, name) + + flow = TestFlow("JuiceFS - Access Minio data") + + flow.append_step( + SimpleStep( + step_name="create jfs secrets", + forth_fn=currying_fn(create_redis_secret, namespace=namespace), + back_fn=currying_fn(clean_up_secret, namespace=namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create dataset", + forth_fn=funcs.create_dataset_fn(dataset.dump()), + back_fn=funcs.delete_dataset_and_runtime_fn(runtime.dump(), name=name, namespace=namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create runtime", + forth_fn=funcs.create_runtime_fn(runtime.dump()), + back_fn=dummy_back + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if dataset is bound", + forth_fn=funcs.check_dataset_bound_fn(name, namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if PV & PVC is ready", + forth_fn=funcs.check_volume_resource_ready_fn(name, namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create data write job", + forth_fn=funcs.create_job_fn(script="dd if=/dev/zero of=/data/allzero.file bs=100M count=10 && sha256sum /data/allzero.file", dataset_name=name, name=test_write_job, namespace=namespace, serverless=True), + back_fn=funcs.delete_job_fn(name=test_write_job, namespace=namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check data write job status", + forth_fn=funcs.check_job_status_fn(name=test_write_job, namespace=namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create data read job", + forth_fn=funcs.create_job_fn(script="time sha256sum /data/allzero.file && rm /data/allzero.file", dataset_name=name, name=test_read_job, namespace=namespace, serverless=True), + back_fn=funcs.delete_job_fn(name=test_read_job, namespace=namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check data read job status", + forth_fn=funcs.check_job_status_fn(name=test_read_job, namespace=namespace) + ) + ) + + flow.append_step( + SimpleStep( + step_name="create DataMigrate job", + forth_fn=currying_fn(create_datamigrate, datamigrate_name=datamigrate_name, dataset_name=name, namespace=namespace), + back_fn=dummy_back + # No need to clean up DataMigrate because of its ownerReference + # back_fn=currying_fn(clean_up_datamigrate, datamigrate_name=datamigrate_name, namespace=namespace) + ) + ) + + flow.append_step( + StatusCheckStep( + step_name="check if DataMigrate succeeds", + forth_fn=currying_fn(check_datamigrate_complete, datamigrate_name=datamigrate_name, namespace=namespace) + ) + ) + + try: + flow.run() + except Exception as e: + print(e) + exit(1) + + +if __name__ == '__main__': + main()