From bf5bc3562769cd875d1e54a3047f00fbe1fc7b97 Mon Sep 17 00:00:00 2001 From: prajinkhadka <34759925+prajinkhadka@users.noreply.github.com> Date: Wed, 6 Nov 2024 19:42:18 -0600 Subject: [PATCH] Container support for AWS (#205) * [Feat] Add build base docker image as abstract method. * [Feat] Update aws to build the base docker image. * [Feat] Add Container deployment flag. To Do [Check for the valid providers that support container deployment] * [Feat] Add Container deployment support. * [Feat] Addd support of the base docker image AWS (python) * [Feat] Code Refacotring. * [Feat] Added Refactoring to do tasks. * [Feat] 1. Return container deployment flag, container URI. 2. Add create function with container URI * [Feat] Refactored the code. To Do: Need to add entrrypoint for docker aws. * Add entry point in the Dockerfile * Maintain cache for the container deployment. * Add correct entrypoint in dockerfile for lambda * Remove Whitespaces * Docler client login through user provided credentials, refactor ecr authorization, and rase NotImplementedError for GCP and Azure * Fix Mistyped * Minor fixes: Getting docker username, passwoord from config * Minor fixes: Only show the push image error once * Minor fixes: Refactor some print debugging statements. * Get repository name from Config. * Get repository name from Config if user has provided else generate randomly * Refactoring * Refactored the create function with simple if/else * Linter and formatter * Removed the parsing for repository name and Image tag. We can use it directly, no need to parse again. * Linter and formatter * Updating function based on container deployment and sotre the code hash for cache * Refactored * Added container deployment as CLI option and update the docs (usage.md) * Linter and formatter * Linter and formatter * Linter and formatter * Add new paramters ( container deployment nad uri) to update_function docs. * Dockerfile for NodeJs * Add Empty requirements * Add Empty requirements * Black fomatter * Black fomatter * Black fomatter * Black fomatter * Black fomatter * Fail early if contianer deployment is set for other than AWS. * Black fomatter * Minor Fix. * Minor Fix. * Remove unnecessary spaces * Use resource id instead of randomname for ecr reposiotry * [Feat] Linting and rebase errors * [aws] Make sure to fail when we cannot log in to ECR * [aws] Reorganize ECR repository creation * [aws] Add improt to rich (nice display) and boto3 stubs for ECR * [aws] Do not cache Docker passwords * [aws] Remove unnecessary config * [aws] Remove unnecessary Docker login * [aws] Customize function name for container deployment * [aws] remove debug output * [aws] [system] Extend cache system to support containers * [aws] Fix typo in the Docker image * [aws] Build arm64 Docker images * [system] Supporting proper caching of containers with different architectures * [aws] Prune pip cache after building the image * [aws] Remove unnecessary package * [aws] CodeRabbit fixes * [aws] Adjust other implementations to a new interface * [aws] Move container implementation to a separate class * [aws] Fix CodeRabbit issue * [aws] [whisk] Fuse two container implementations * [tools] Fix incorrect handling of False values * [system] Reorient container definition to distinguish between AWS and OpenWhisk support * [system] Add storage configuration option for benchmark invoke * [system] Disable rich output for regression * [benchmark] Separate directory for concurrent build of code package and container * [system] Extend regression to support containers * [system] Provide documentation on multi-platform builds * [aws] Update docs --------- Co-authored-by: Marcin Copik --- .../120.uploader/python/requirements.txt | 0 .../311.compression/python/requirements.txt | 0 benchmarks/wrappers/aws/python/setup.py | 15 ++ config/example.json | 9 +- config/systems.json | 111 +++++---- dockerfiles/aws/nodejs/Dockerfile.function | 9 + dockerfiles/aws/python/Dockerfile.function | 22 ++ docs/build.md | 2 +- docs/modularity.md | 2 +- docs/platforms.md | 10 + docs/usage.md | 3 +- requirements.aws.txt | 3 +- requirements.txt | 1 + sebs.py | 78 ++++--- sebs/aws/aws.py | 173 +++++++++----- sebs/aws/config.py | 108 ++++++++- sebs/aws/container.py | 79 +++++++ sebs/azure/azure.py | 37 ++- sebs/benchmark.py | 62 +++-- sebs/cache.py | 115 ++++++++-- sebs/config.py | 36 ++- sebs/experiments/config.py | 7 + sebs/experiments/invocation_overhead.py | 2 +- sebs/faas/container.py | 211 ++++++++++++++++++ sebs/faas/system.py | 79 ++++++- sebs/gcp/gcp.py | 35 ++- sebs/local/local.py | 25 ++- sebs/openwhisk/container.py | 46 ++++ sebs/openwhisk/openwhisk.py | 148 +++--------- sebs/regression.py | 48 +++- sebs/sebs.py | 13 +- sebs/utils.py | 2 +- 32 files changed, 1149 insertions(+), 342 deletions(-) create mode 100644 benchmarks/100.webapps/120.uploader/python/requirements.txt create mode 100644 benchmarks/300.utilities/311.compression/python/requirements.txt create mode 100644 benchmarks/wrappers/aws/python/setup.py create mode 100644 dockerfiles/aws/nodejs/Dockerfile.function create mode 100644 dockerfiles/aws/python/Dockerfile.function create mode 100644 sebs/aws/container.py create mode 100644 sebs/faas/container.py create mode 100644 sebs/openwhisk/container.py diff --git a/benchmarks/100.webapps/120.uploader/python/requirements.txt b/benchmarks/100.webapps/120.uploader/python/requirements.txt new file mode 100644 index 00000000..e69de29b diff --git a/benchmarks/300.utilities/311.compression/python/requirements.txt b/benchmarks/300.utilities/311.compression/python/requirements.txt new file mode 100644 index 00000000..e69de29b diff --git a/benchmarks/wrappers/aws/python/setup.py b/benchmarks/wrappers/aws/python/setup.py new file mode 100644 index 00000000..b3d87835 --- /dev/null +++ b/benchmarks/wrappers/aws/python/setup.py @@ -0,0 +1,15 @@ +from distutils.core import setup +from glob import glob +from pkg_resources import parse_requirements + +with open('requirements.txt') as f: + requirements = [str(r) for r in parse_requirements(f)] + +setup( + name='function', + install_requires=requirements, + packages=['function'], + package_dir={'function': '.'}, + package_data={'function': glob('**', recursive=True)}, +) + diff --git a/config/example.json b/config/example.json index 95543700..ea62910d 100644 --- a/config/example.json +++ b/config/example.json @@ -1,13 +1,14 @@ { "experiments": { - "deployment": "openwhisk", + "deployment": "aws", "update_code": false, "update_storage": false, - "download_results": false, + "download_results": false, "architecture": "arm64", + "container_deployment": true, "runtime": { - "language": "nodejs", - "version": "16" + "language": "python", + "version": "3.8" }, "type": "invocation-overhead", "perf-cost": { diff --git a/config/systems.json b/config/systems.json index 21e22768..0ce22383 100644 --- a/config/systems.json +++ b/config/systems.json @@ -55,16 +55,25 @@ } } }, - "architecture": ["x64"] + "architecture": ["x64"], + "deployments": ["package"] }, "aws": { "languages": { "python": { "base_images": { - "3.11": "amazon/aws-lambda-python:3.11", - "3.10": "amazon/aws-lambda-python:3.10", - "3.9": "amazon/aws-lambda-python:3.9", - "3.8": "amazon/aws-lambda-python:3.8" + "x64": { + "3.11": "amazon/aws-lambda-python:3.11", + "3.10": "amazon/aws-lambda-python:3.10", + "3.9": "amazon/aws-lambda-python:3.9", + "3.8": "amazon/aws-lambda-python:3.8" + }, + "arm64": { + "3.11": "amazon/aws-lambda-python:3.11.2024.05.23.17", + "3.10": "amazon/aws-lambda-python:3.10.2024.06.19.11", + "3.9": "amazon/aws-lambda-python:3.9.2024.05.20.23", + "3.8": "amazon/aws-lambda-python:3.8.2024.09.05.16" + } }, "images": [ "build" @@ -72,14 +81,20 @@ "deployment": { "files": [ "handler.py", - "storage.py" + "storage.py", + "setup.py" ], "packages": [] } }, "nodejs": { "base_images": { - "16": "amazon/aws-lambda-nodejs:16" + "x64": { + "16": "amazon/aws-lambda-nodejs:16" + }, + "arm64": { + "16": "amazon/aws-lambda-nodejs:16.2024.09.06.13" + } }, "images": [ "build" @@ -95,17 +110,20 @@ } } }, - "architecture": ["x64", "arm64"] + "architecture": ["x64", "arm64"], + "deployments": ["package", "container"] }, "azure": { "languages": { "python": { "base_images": { - "3.7": "mcr.microsoft.com/azure-functions/python:3.0-python3.7", - "3.8": "mcr.microsoft.com/azure-functions/python:3.0-python3.8", - "3.9": "mcr.microsoft.com/azure-functions/python:3.0-python3.9", - "3.10": "mcr.microsoft.com/azure-functions/python:4-python3.10", - "3.11": "mcr.microsoft.com/azure-functions/python:4-python3.11" + "x64": { + "3.7": "mcr.microsoft.com/azure-functions/python:3.0-python3.7", + "3.8": "mcr.microsoft.com/azure-functions/python:3.0-python3.8", + "3.9": "mcr.microsoft.com/azure-functions/python:3.0-python3.9", + "3.10": "mcr.microsoft.com/azure-functions/python:4-python3.10", + "3.11": "mcr.microsoft.com/azure-functions/python:4-python3.11" + } }, "images": [ "build" @@ -123,9 +141,11 @@ }, "nodejs": { "base_images": { - "16": "mcr.microsoft.com/azure-functions/node:4-node16", - "18": "mcr.microsoft.com/azure-functions/node:4-node18", - "20": "mcr.microsoft.com/azure-functions/node:4-node20" + "x64": { + "16": "mcr.microsoft.com/azure-functions/node:4-node16", + "18": "mcr.microsoft.com/azure-functions/node:4-node18", + "20": "mcr.microsoft.com/azure-functions/node:4-node20" + } }, "images": [ "build" @@ -148,18 +168,21 @@ "username": "docker_user" } }, - "architecture": ["x64"] + "architecture": ["x64"], + "deployments": ["package"] }, "gcp": { "languages": { "python": { "base_images": { - "3.7": "ubuntu:22.04", - "3.8": "ubuntu:22.04", - "3.9": "ubuntu:22.04", - "3.10": "ubuntu:22.04", - "3.11": "ubuntu:22.04", - "3.12": "ubuntu:22.04" + "x64": { + "3.7": "ubuntu:22.04", + "3.8": "ubuntu:22.04", + "3.9": "ubuntu:22.04", + "3.10": "ubuntu:22.04", + "3.11": "ubuntu:22.04", + "3.12": "ubuntu:22.04" + } }, "images": [ "build" @@ -177,12 +200,14 @@ }, "nodejs": { "base_images": { - "10": "ubuntu:18.04", - "12": "ubuntu:18.04", - "14": "ubuntu:18.04", - "16": "ubuntu:18.04", - "18": "ubuntu:22.04", - "20": "ubuntu:22.04" + "x64": { + "10": "ubuntu:18.04", + "12": "ubuntu:18.04", + "14": "ubuntu:18.04", + "16": "ubuntu:18.04", + "18": "ubuntu:22.04", + "20": "ubuntu:22.04" + } }, "images": [ "build" @@ -200,16 +225,19 @@ } } }, - "architecture": ["x64"] + "architecture": ["x64"], + "deployments": ["package"] }, "openwhisk": { "languages": { "python": { "base_images": { - "3.7": "openwhisk/action-python-v3.7", - "3.9": "openwhisk/action-python-v3.9", - "3.10": "openwhisk/action-python-v3.10", - "3.11": "openwhisk/action-python-v3.11" + "x64": { + "3.7": "openwhisk/action-python-v3.7", + "3.9": "openwhisk/action-python-v3.9", + "3.10": "openwhisk/action-python-v3.10", + "3.11": "openwhisk/action-python-v3.11" + } }, "images": [ "function" @@ -228,11 +256,13 @@ }, "nodejs": { "base_images": { - "10": "openwhisk/action-nodejs-v10", - "12": "openwhisk/action-nodejs-v12", - "14": "openwhisk/action-nodejs-v14", - "18": "openwhisk/action-nodejs-v18", - "20": "openwhisk/action-nodejs-v20" + "x64": { + "10": "openwhisk/action-nodejs-v10", + "12": "openwhisk/action-nodejs-v12", + "14": "openwhisk/action-nodejs-v14", + "18": "openwhisk/action-nodejs-v18", + "20": "openwhisk/action-nodejs-v20" + } }, "images": [ "function" @@ -249,6 +279,7 @@ } } }, - "architecture": ["x64"] + "architecture": ["x64"], + "deployments": ["container"] } } diff --git a/dockerfiles/aws/nodejs/Dockerfile.function b/dockerfiles/aws/nodejs/Dockerfile.function new file mode 100644 index 00000000..049dae7d --- /dev/null +++ b/dockerfiles/aws/nodejs/Dockerfile.function @@ -0,0 +1,9 @@ +ARG BASE_IMAGE +FROM $BASE_IMAGE +COPY . function/ +COPY handler.js . +RUN cd function \ + && npm install --no-package-lock --production \ + && npm cache clean --force + +CMD ["handler.handler"] diff --git a/dockerfiles/aws/python/Dockerfile.function b/dockerfiles/aws/python/Dockerfile.function new file mode 100644 index 00000000..65ec420b --- /dev/null +++ b/dockerfiles/aws/python/Dockerfile.function @@ -0,0 +1,22 @@ +ARG BASE_IMAGE +FROM $BASE_IMAGE +ARG VERSION +ENV PYTHON_VERSION=${VERSION} + +COPY . function/ + +RUN touch function/__init__.py +RUN if test -f "function/requirements.txt.${PYTHON_VERSION}"; then \ + pip install --no-cache-dir \ + -r function/requirements.txt \ + -r function/requirements.txt.${PYTHON_VERSION} \ + function/ && \ + pip cache purge; \ + else \ + pip install --no-cache-dir \ + -r function/requirements.txt \ + function/ && \ + pip cache purge; \ + fi + +CMD ["function/handler.handler"] diff --git a/docs/build.md b/docs/build.md index c6c293aa..528533a1 100644 --- a/docs/build.md +++ b/docs/build.md @@ -42,7 +42,7 @@ JSON configuration files are needed. **Build Docker Image** - in this step, we create a new image `function.{platform}.{benchmark}.{language}-{version}`. Benchmark and all of its dependencies are installed there, and the image can be deployed directly -to the serverless platform. At the moment, this step is used only in OpenWhisk. +to the serverless platform. At the moment, this step is used only on AWS and in OpenWhisk. ## Docker Image Build diff --git a/docs/modularity.md b/docs/modularity.md index 7e3c7fcc..4febc1b7 100644 --- a/docs/modularity.md +++ b/docs/modularity.md @@ -340,7 +340,7 @@ This function has been retrieved from the cache and requires refreshing function In practice, this is often limited to updating logging handlers - see existing implementations for details. ```python - def update_function(self, function: Function, code_package: Benchmark): + def update_function(self, function: Function, code_package: Benchmark, container_deployment: bool, container_uri: str): ``` This function updates the function's code and configuration in the platform. diff --git a/docs/platforms.md b/docs/platforms.md index 27738b6e..83a94934 100644 --- a/docs/platforms.md +++ b/docs/platforms.md @@ -12,6 +12,16 @@ points for each platform. > [!WARNING] > On many platforms, credentials can be provided as environment variables or through the SeBS configuration. SeBS will not store your credentials in the cache. When saving results, SeBS stores user benchmark and experiment configuration for documentation and reproducibility, except for credentials that are erased. If you provide the credentials through JSON input configuration, do not commit nor publish these files anywhere. +### Architectures + +By default, SeBS defaults functions built for the x64 (x86_64) architecture. On AWS, functions can also be build and deployed for ARM CPUs to benefit from Graviton CPUs available on Lambda. +This change primarily affects functions that make use of dependencies with native builds, such as `torch`, `numpy` or `ffmpeg`. + +Such functions can be build as code packages on any platforms, as we rely on package managers like pip and npm to provide binary dependencies. +However, special care is needed to build Docker containers: since installation of packages is a part of the Docker build, we cannot natively execute +binaries based on ARM containers on x86 CPUs. To build multi-platform images, we recommend to follow official [Docker guidelines](https://docs.docker.com/build/building/multi-platform/#build-multi-platform-images) and provide static QEMU installation. +On Ubuntu-based distributions, this requires installing an OS package and executing a single Docker command to provide seamless emulation of ARM containers. + ### Cloud Account Identifiers SeBS ensures that all locally cached cloud resources are valid by storing a unique identifier associated with each cloud account. Furthermore, we store this identifier in experiment results to easily match results with the cloud account or subscription that was used to obtain them. We use non-sensitive identifiers such as account IDs on AWS, subscription IDs on Azure, and Google Cloud project IDs. diff --git a/docs/usage.md b/docs/usage.md index 97b6d752..d2813014 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -4,8 +4,7 @@ For each command you can pass `--verbose` flag to increase the verbosity of the By default, all scripts will create a cache in the directory `cache` to store code with dependencies and information on allocated cloud resources. Benchmarks will be rebuilt after a change in source code is detected. -To enforce redeployment of code and benchmark inputs please use flags `--update-code` -and `--update-storage`, respectively. +To enforce redeployment of code, benchmark inputs, container deployment (supported in AWS) please use flags `--update-code`, `--update-storage` and `--container-deployment` respectively. **Note:** The cache does not support updating the cloud region. If you want to deploy benchmarks to a new cloud region, then use a new cache directory. diff --git a/requirements.aws.txt b/requirements.aws.txt index aae7bb2f..41434ff8 100644 --- a/requirements.aws.txt +++ b/requirements.aws.txt @@ -2,5 +2,4 @@ boto3 botocore flake8-boto3 urllib3 -boto3-stubs -boto3-stubs[lambda,s3,apigatewayv2,sts,logs,iam] +boto3-stubs[lambda,s3,apigatewayv2,sts,logs,iam,ecr] diff --git a/requirements.txt b/requirements.txt index 2717467c..6c6d01d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,4 +18,5 @@ scipy # pycurl>=7.43 click>=7.1.2 +rich diff --git a/sebs.py b/sebs.py index 0c6a9d48..fcb54087 100755 --- a/sebs.py +++ b/sebs.py @@ -97,6 +97,11 @@ def common_params(func): type=click.Choice(["x64", "arm64"]), help="Target architecture", ) + @click.option( + "--container-deployment/--no-container-deployment", + default=False, + help="Deploy functions as containers (AWS only). When enabled, functions are packaged as container images and pushed to Amazon ECR." + ) @click.option( "--resource-prefix", default=None, @@ -125,6 +130,7 @@ def parse_common_params( language, language_version, architecture, + container_deployment, resource_prefix: Optional[str] = None, initialize_deployment: bool = True, ignore_cache: bool = False, @@ -148,6 +154,7 @@ def parse_common_params( update_nested_dict(config_obj, ["experiments", "update_code"], update_code) update_nested_dict(config_obj, ["experiments", "update_storage"], update_storage) update_nested_dict(config_obj, ["experiments", "architecture"], architecture) + update_nested_dict(config_obj, ["experiments", "container_deployment"], container_deployment) # set the path the configuration was loaded from update_nested_dict(config_obj, ["deployment", "local", "path"], config) @@ -218,6 +225,7 @@ def benchmark(): type=str, help="Attach prefix to generated Docker image tag.", ) +@click.option("--storage-configuration", default=None, type=str, help="JSON configuration of deployed storage.") @common_params def invoke( benchmark, @@ -236,8 +244,9 @@ def invoke( output_dir, logging_filename, sebs_client, - deployment_client, + deployment_client ) = parse_common_params(**kwargs) + if image_tag_prefix is not None: sebs_client.config.image_tag_prefix = image_tag_prefix @@ -249,6 +258,7 @@ def invoke( experiment_config, logging_filename=logging_filename, ) + if memory is not None: benchmark_obj.benchmark_config.memory = memory if timeout is not None: @@ -258,6 +268,7 @@ def invoke( benchmark_obj, function_name if function_name else deployment_client.default_function_name(benchmark_obj), ) + storage = deployment_client.get_storage(replace_existing=experiment_config.update_storage) input_config = benchmark_obj.prepare_input(storage=storage, size=benchmark_input_size) @@ -371,7 +382,9 @@ def storage_start(storage, output_json, port): sebs.utils.global_logging() storage_type = sebs.SeBS.get_storage_implementation(StorageTypes(storage)) - storage_config, storage_resources = sebs.SeBS.get_storage_config_implementation(StorageTypes(storage)) + storage_config, storage_resources = sebs.SeBS.get_storage_config_implementation( + StorageTypes(storage) + ) config = storage_config() resources = storage_resources() @@ -405,7 +418,9 @@ def storage_stop(input_json): resources = storage_resources() logging.info(f"Stopping storage deployment of {storage_type}.") - storage = sebs.SeBS.get_storage_implementation(storage_type).deserialize(config, None, resources) + storage = sebs.SeBS.get_storage_implementation(storage_type).deserialize( + config, None, resources + ) storage.stop() logging.info(f"Stopped storage deployment of {storage_type}.") @@ -421,16 +436,28 @@ def local(): @click.argument("output", type=str) @click.option("--deployments", default=1, type=int, help="Number of deployed containers.") @click.option("--storage-configuration", type=str, help="JSON configuration of deployed storage.") -@click.option("--measure-interval", type=int, default=-1, - help="Interval duration between memory measurements in ms.") +@click.option( + "--measure-interval", + type=int, + default=-1, + help="Interval duration between memory measurements in ms.", +) @click.option( "--remove-containers/--no-remove-containers", default=True, help="Remove containers after stopping.", ) @simplified_common_params -def start(benchmark, benchmark_input_size, output, deployments, storage_configuration, - measure_interval, remove_containers, **kwargs): +def start( + benchmark, + benchmark_input_size, + output, + deployments, + storage_configuration, + measure_interval, + remove_containers, + **kwargs, +): """ Start a given number of function instances and a storage instance. """ @@ -540,10 +567,7 @@ def resources(): @resources.command("list") -@click.argument( - "resource", - type=click.Choice(["buckets", "resource-groups"]) -) +@click.argument("resource", type=click.Choice(["buckets", "resource-groups"])) @common_params def resources_list(resource, **kwargs): @@ -568,32 +592,23 @@ def resources_list(resource, **kwargs): sebs_client.logging.error("Resource groups are only supported on Azure!") return - groups = deployment_client.config.resources.list_resource_groups(deployment_client.cli_instance) + groups = deployment_client.config.resources.list_resource_groups( + deployment_client.cli_instance + ) sebs_client.logging.info("Resource grup:") for idx, bucket in enumerate(groups): sebs_client.logging.info(f"({idx}) {bucket}") @resources.command("remove") -@click.argument( - "resource", - type=click.Choice(["buckets", "resource-groups"]) -) -@click.argument( - "prefix", - type=str -) -@click.option( - "--wait/--no-wait", - type=bool, - default=True, - help="Wait for completion of removal." -) +@click.argument("resource", type=click.Choice(["buckets", "resource-groups"])) +@click.argument("prefix", type=str) +@click.option("--wait/--no-wait", type=bool, default=True, help="Wait for completion of removal.") @click.option( "--dry-run/--no-dry-run", type=bool, default=False, - help="Simulate run without actual deletions." + help="Simulate run without actual deletions.", ) @common_params def resources_remove(resource, prefix, wait, dry_run, **kwargs): @@ -626,13 +641,18 @@ def resources_remove(resource, prefix, wait, dry_run, **kwargs): sebs_client.logging.error("Resource groups are only supported on Azure!") return - groups = deployment_client.config.resources.list_resource_groups(deployment_client.cli_instance) + groups = deployment_client.config.resources.list_resource_groups( + deployment_client.cli_instance + ) for idx, group in enumerate(groups): if len(prefix) > 0 and not group.startswith(prefix): continue sebs_client.logging.info(f"Removing resource group: {group}") - deployment_client.config.resources.delete_resource_group(deployment_client.cli_instance, group, wait) + deployment_client.config.resources.delete_resource_group( + deployment_client.cli_instance, group, wait + ) + if __name__ == "__main__": cli() diff --git a/sebs/aws/aws.py b/sebs/aws/aws.py index a4a1e03b..c8a4642c 100644 --- a/sebs/aws/aws.py +++ b/sebs/aws/aws.py @@ -10,6 +10,7 @@ from sebs.aws.s3 import S3 from sebs.aws.function import LambdaFunction +from sebs.aws.container import ECRContainer from sebs.aws.config import AWSConfig from sebs.faas.config import Resources from sebs.utils import execute @@ -72,6 +73,10 @@ def initialize(self, config: Dict[str, str] = {}, resource_prefix: Optional[str] self.get_storage() self.initialize_resources(select_prefix=resource_prefix) + self.ecr_client = ECRContainer( + self.system_config, self.session, self.config, self.docker_client + ) + def get_lambda_client(self): if not hasattr(self, "client"): self.client = self.session.client( @@ -130,9 +135,20 @@ def package_code( directory: str, language_name: str, language_version: str, + architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + container_deployment: bool, + ) -> Tuple[str, int, str]: + + container_uri = "" + + # if the containerized deployment is set to True + if container_deployment: + # build base image and upload to ECR + _, container_uri = self.ecr_client.build_base_image( + directory, language_name, language_version, architecture, benchmark, is_cached + ) CONFIG_FILES = { "python": ["handler.py", "requirements.txt", ".python_packages"], @@ -146,7 +162,6 @@ def package_code( if file not in package_config: file = os.path.join(directory, file) shutil.move(file, function_dir) - # FIXME: use zipfile # create zip with hidden directory but without parent directory execute("zip -qu -r9 {}.zip * .".format(benchmark), shell=True, cwd=directory) @@ -157,7 +172,11 @@ def package_code( mbytes = bytes_size / 1024.0 / 1024.0 self.logging.info("Zip archive size {:2f} MB".format(mbytes)) - return os.path.join(directory, "{}.zip".format(benchmark)), bytes_size + return ( + os.path.join(directory, "{}.zip".format(benchmark)), + bytes_size, + container_uri, + ) def _map_architecture(self, architecture: str) -> str: @@ -173,7 +192,13 @@ def _map_language_runtime(self, language: str, runtime: str): return f"{runtime}.x" return runtime - def create_function(self, code_package: Benchmark, func_name: str) -> "LambdaFunction": + def create_function( + self, + code_package: Benchmark, + func_name: str, + container_deployment: bool, + container_uri: str, + ) -> "LambdaFunction": package = code_package.code_location benchmark = code_package.benchmark @@ -204,42 +229,53 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LambdaFun self.config.resources.lambda_role(self.session), function_cfg, ) - self.update_function(lambda_function, code_package) + self.update_function(lambda_function, code_package, container_deployment, container_uri) lambda_function.updated_code = True # TODO: get configuration of REST API except self.client.exceptions.ResourceNotFoundException: self.logging.info("Creating function {} from {}".format(func_name, package)) - # AWS Lambda limit on zip deployment size - # Limit to 50 MB - # mypy doesn't recognize correctly the case when the same - # variable has different types across the path - code_config: Dict[str, Union[str, bytes]] - if code_size < 50 * 1024 * 1024: - package_body = open(package, "rb").read() - code_config = {"ZipFile": package_body} - # Upload code package to S3, then use it + create_function_params = { + "FunctionName": func_name, + "Role": self.config.resources.lambda_role(self.session), + "MemorySize": memory, + "Timeout": timeout, + "Architectures": [self._map_architecture(architecture)], + "Code": {}, + } + + if container_deployment: + create_function_params["PackageType"] = "Image" + create_function_params["Code"] = {"ImageUri": container_uri} else: - code_package_name = cast(str, os.path.basename(package)) - - code_bucket = storage_client.get_bucket(Resources.StorageBucketType.DEPLOYMENT) - code_prefix = os.path.join(benchmark, architecture, code_package_name) - storage_client.upload(code_bucket, package, code_prefix) - - self.logging.info("Uploading function {} code to {}".format(func_name, code_bucket)) - code_config = {"S3Bucket": code_bucket, "S3Key": code_prefix} - ret = self.client.create_function( - FunctionName=func_name, - Runtime="{}{}".format( + create_function_params["PackageType"] = "Zip" + if code_size < 50 * 1024 * 1024: + package_body = open(package, "rb").read() + create_function_params["Code"] = {"ZipFile": package_body} + else: + code_package_name = cast(str, os.path.basename(package)) + + code_bucket = storage_client.get_bucket(Resources.StorageBucketType.DEPLOYMENT) + code_prefix = os.path.join(benchmark, architecture, code_package_name) + storage_client.upload(code_bucket, package, code_prefix) + + self.logging.info( + "Uploading function {} code to {}".format(func_name, code_bucket) + ) + create_function_params["Code"] = { + "S3Bucket": code_bucket, + "S3Key": code_prefix, + } + + create_function_params["Runtime"] = "{}{}".format( language, self._map_language_runtime(language, language_runtime) - ), - Handler="handler.handler", - Role=self.config.resources.lambda_role(self.session), - MemorySize=memory, - Timeout=timeout, - Code=code_config, - Architectures=[self._map_architecture(architecture)], - ) + ) + create_function_params["Handler"] = "handler.handler" + + create_function_params = { + k: v for k, v in create_function_params.items() if v is not None + } + ret = self.client.create_function(**create_function_params) lambda_function = LambdaFunction( func_name, @@ -284,41 +320,51 @@ def cached_function(self, function: Function): :param memory: memory limit for function """ - def update_function(self, function: Function, code_package: Benchmark): - - function = cast(LambdaFunction, function) + def update_function( + self, + function: Function, + code_package: Benchmark, + container_deployment: bool, + container_uri: str, + ): name = function.name - code_size = code_package.code_size - package = code_package.code_location - benchmark = code_package.benchmark + function = cast(LambdaFunction, function) - function_cfg = FunctionConfig.from_benchmark(code_package) - architecture = function_cfg.architecture.value + if container_deployment: + self.client.update_function_code(FunctionName=name, ImageUri=container_uri) + else: + code_size = code_package.code_size + package = code_package.code_location + benchmark = code_package.benchmark + + function_cfg = FunctionConfig.from_benchmark(code_package) + architecture = function_cfg.architecture.value + + # Run AWS update + # AWS Lambda limit on zip deployment + if code_size < 50 * 1024 * 1024: + with open(package, "rb") as code_body: + self.client.update_function_code( + FunctionName=name, + ZipFile=code_body.read(), + Architectures=[self._map_architecture(architecture)], + ) + # Upload code package to S3, then update + else: + code_package_name = os.path.basename(package) + + storage = cast(S3, self.get_storage()) + bucket = function.code_bucket(code_package.benchmark, storage) + code_prefix = os.path.join(benchmark, architecture, code_package_name) + storage.upload(bucket, package, code_prefix) - # Run AWS update - # AWS Lambda limit on zip deployment - if code_size < 50 * 1024 * 1024: - with open(package, "rb") as code_body: self.client.update_function_code( FunctionName=name, - ZipFile=code_body.read(), + S3Bucket=bucket, + S3Key=code_prefix, Architectures=[self._map_architecture(architecture)], ) - # Upload code package to S3, then update - else: - code_package_name = os.path.basename(package) - - storage = cast(S3, self.get_storage()) - bucket = function.code_bucket(code_package.benchmark, storage) - code_prefix = os.path.join(benchmark, architecture, code_package_name) - storage.upload(bucket, package, code_prefix) - - self.client.update_function_code( - FunctionName=name, - S3Bucket=bucket, - S3Key=code_prefix, - Architectures=[self._map_architecture(architecture)], - ) + self.wait_function_updated(function) self.logging.info(f"Updated code of {name} function. ") # and update config @@ -351,6 +397,8 @@ def default_function_name(code_package: Benchmark) -> str: code_package.language_version, code_package.architecture, ) + if code_package.container_deployment: + func_name = f"{func_name}-docker" return AWS.format_function_name(func_name) @staticmethod @@ -577,3 +625,6 @@ def wait_function_updated(self, func: LambdaFunction): waiter = self.client.get_waiter("function_updated_v2") waiter.wait(FunctionName=func.name) self.logging.info("Lambda function has been updated.") + + def disable_rich_output(self): + self.ecr_client.disable_rich_output = True diff --git a/sebs/aws/config.py b/sebs/aws/config.py index 3d958585..2d05e842 100644 --- a/sebs/aws/config.py +++ b/sebs/aws/config.py @@ -1,10 +1,11 @@ +import base64 import json import os import time -from typing import cast, Dict, Optional +from typing import cast, Dict, Optional, Tuple import boto3 - +from mypy_boto3_ecr import ECRClient from sebs.cache import Cache from sebs.faas.config import Config, Credentials, Resources @@ -114,16 +115,40 @@ def serialize(self) -> dict: out = {"arn": self.arn, "endpoint": self.endpoint} return out - def __init__(self): + def __init__( + self, + registry: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + ): super().__init__(name="aws") + self._docker_registry: Optional[str] = registry if registry != "" else None + self._docker_username: Optional[str] = username if username != "" else None + self._docker_password: Optional[str] = password if password != "" else None + self._container_repository: Optional[str] = None self._lambda_role = "" self._http_apis: Dict[str, AWSResources.HTTPApi] = {} - self._region: Optional[str] = None @staticmethod def typename() -> str: return "AWS.Resources" + @property + def docker_registry(self) -> Optional[str]: + return self._docker_registry + + @property + def docker_username(self) -> Optional[str]: + return self._docker_username + + @property + def docker_password(self) -> Optional[str]: + return self._docker_password + + @property + def container_repository(self) -> Optional[str]: + return self._container_repository + def lambda_role(self, boto3_session: boto3.session.Session) -> str: if not self._lambda_role: iam_client = boto3_session.client(service_name="iam") @@ -217,12 +242,69 @@ def http_api( self.logging.info(f"Using cached HTTP API {api_name}") return http_api - # FIXME: python3.7+ future annotations + def check_ecr_repository_exists( + self, ecr_client: ECRClient, repository_name: str + ) -> Optional[str]: + try: + resp = ecr_client.describe_repositories(repositoryNames=[repository_name]) + return resp["repositories"][0]["repositoryUri"] + except ecr_client.exceptions.RepositoryNotFoundException: + return None + except Exception as e: + self.logging.error(f"Error checking repository: {e}") + raise e + + def get_ecr_repository(self, ecr_client: ECRClient) -> str: + + if self._container_repository is not None: + return self._container_repository + + self._container_repository = "sebs-benchmarks-{}".format(self._resources_id) + + self._docker_registry = self.check_ecr_repository_exists( + ecr_client, self._container_repository + ) + + if self._docker_registry is None: + try: + resp = ecr_client.create_repository(repositoryName=self._container_repository) + self.logging.info(f"Created ECR repository: {self._container_repository}") + + self._docker_registry = resp["repository"]["repositoryUri"] + except ecr_client.exceptions.RepositoryAlreadyExistsException: + # Support the situation where two invocations concurrently initialize it. + self.logging.info(f"ECR repository {self._container_repository} already exists.") + self._docker_registry = self.check_ecr_repository_exists( + ecr_client, self._container_repository + ) + + return self._container_repository + + def ecr_repository_authorization(self, ecr_client: ECRClient) -> Tuple[str, str, str]: + + if self._docker_password is None: + response = ecr_client.get_authorization_token() + auth_token = response["authorizationData"][0]["authorizationToken"] + decoded_token = base64.b64decode(auth_token).decode("utf-8") + # Split username:password + self._docker_username, self._docker_password = decoded_token.split(":") + + assert self._docker_username is not None + assert self._docker_registry is not None + + return self._docker_username, self._docker_password, self._docker_registry + @staticmethod def initialize(res: Resources, dct: dict): ret = cast(AWSResources, res) super(AWSResources, AWSResources).initialize(ret, dct) + + if "docker" in dct: + ret._docker_registry = dct["docker"]["registry"] + ret._docker_username = dct["docker"]["username"] + ret._container_repository = dct["container_repository"] + ret._lambda_role = dct["lambda-role"] if "lambda-role" in dct else "" if "http-apis" in dct: for key, value in dct["http-apis"].items(): @@ -235,20 +317,34 @@ def serialize(self) -> dict: **super().serialize(), "lambda-role": self._lambda_role, "http-apis": {key: value.serialize() for (key, value) in self._http_apis.items()}, + "docker": { + "registry": self.docker_registry, + "username": self.docker_username, + }, + "container_repository": self.container_repository, } return out def update_cache(self, cache: Cache): super().update_cache(cache) + cache.update_config( + val=self.docker_registry, keys=["aws", "resources", "docker", "registry"] + ) + cache.update_config( + val=self.docker_username, keys=["aws", "resources", "docker", "username"] + ) + cache.update_config( + val=self.container_repository, keys=["aws", "resources", "container_repository"] + ) cache.update_config(val=self._lambda_role, keys=["aws", "resources", "lambda-role"]) for name, api in self._http_apis.items(): cache.update_config(val=api.serialize(), keys=["aws", "resources", "http-apis", name]) @staticmethod def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: - ret = AWSResources() cached_config = cache.get_config("aws") + # Load cached values if cached_config and "resources" in cached_config: AWSResources.initialize(ret, cached_config["resources"]) diff --git a/sebs/aws/container.py b/sebs/aws/container.py new file mode 100644 index 00000000..e7c2cbe6 --- /dev/null +++ b/sebs/aws/container.py @@ -0,0 +1,79 @@ +import docker +from typing import Tuple + +import boto3 +from botocore.exceptions import ClientError +from mypy_boto3_ecr import ECRClient + +from sebs.aws.config import AWSConfig +from sebs.config import SeBSConfig +from sebs.faas.container import DockerContainer + + +class ECRContainer(DockerContainer): + @staticmethod + def name(): + return "aws" + + @staticmethod + def typename() -> str: + return "AWS.ECRContainer" + + def __init__( + self, + system_config: SeBSConfig, + session: boto3.session.Session, + config: AWSConfig, + docker_client: docker.client.DockerClient, + ): + + super().__init__(system_config, docker_client) + self.ecr_client = session.client(service_name="ecr", region_name=config.region) + self.config = config + + @property + def client(self) -> ECRClient: + return self.ecr_client + + def registry_name( + self, benchmark: str, language_name: str, language_version: str, architecture: str + ) -> Tuple[str, str, str, str]: + + account_id = self.config.credentials.account_id + region = self.config.region + registry_name = f"{account_id}.dkr.ecr.{region}.amazonaws.com" + + repository_name = self.config.resources.get_ecr_repository(self.client) + image_tag = self.system_config.benchmark_image_tag( + self.name(), benchmark, language_name, language_version, architecture + ) + image_uri = f"{registry_name}/{repository_name}:{image_tag}" + + return registry_name, repository_name, image_tag, image_uri + + def find_image(self, repository_name, image_tag) -> bool: + try: + response = self.ecr_client.describe_images( + repositoryName=repository_name, imageIds=[{"imageTag": image_tag}] + ) + if response["imageDetails"]: + return True + except ClientError: + return False + + return False + + def push_image(self, repository_uri, image_tag): + + username, password, registry_url = self.config.resources.ecr_repository_authorization( + self.client + ) + + try: + self.docker_client.login(username=username, password=password, registry=registry_url) + super().push_image(repository_uri, image_tag) + self.logging.info(f"Successfully pushed the image to registry {repository_uri}.") + except docker.errors.APIError as e: + self.logging.error(f"Failed to push the image to registry {repository_uri}.") + self.logging.error(f"Error: {str(e)}") + raise RuntimeError("Couldn't push to Docker registry") diff --git a/sebs/azure/azure.py b/sebs/azure/azure.py index 78e45963..53ede0d3 100644 --- a/sebs/azure/azure.py +++ b/sebs/azure/azure.py @@ -159,9 +159,16 @@ def package_code( directory: str, language_name: str, language_version: str, + architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + container_deployment: bool, + ) -> Tuple[str, int, str]: + + container_uri = "" + + if container_deployment: + raise NotImplementedError("Container Deployment is not supported in Azure") # In previous step we ran a Docker container which installed packages # Python packages are in .python_packages because this is expected by Azure @@ -210,7 +217,7 @@ def package_code( code_size = Benchmark.directory_size(directory) execute("zip -qu -r9 {}.zip * .".format(benchmark), shell=True, cwd=directory) - return directory, code_size + return directory, code_size, container_uri def publish_function( self, @@ -285,7 +292,16 @@ def publish_function( :return: URL to reach HTTP-triggered function """ - def update_function(self, function: Function, code_package: Benchmark): + def update_function( + self, + function: Function, + code_package: Benchmark, + container_deployment: bool, + container_uri: str, + ): + + if container_deployment: + raise NotImplementedError("Container deployment is not supported in Azure") # Mount code package in Docker instance container_dest = self._mount_function_code(code_package) @@ -322,7 +338,16 @@ def default_function_name(self, code_package: Benchmark) -> str: ) return func_name - def create_function(self, code_package: Benchmark, func_name: str) -> AzureFunction: + def create_function( + self, + code_package: Benchmark, + func_name: str, + container_deployment: bool, + container_uri: str, + ) -> AzureFunction: + + if container_deployment: + raise NotImplementedError("Container deployment is not supported in Azure") language = code_package.language_name language_runtime = code_package.language_version @@ -393,7 +418,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> AzureFunct ) # update existing function app - self.update_function(function, code_package) + self.update_function(function, code_package, container_deployment, container_uri) self.cache_client.add_function( deployment_name=self.name(), @@ -502,7 +527,7 @@ def _enforce_cold_start(self, function: Function, code_package: Benchmark): f" --settings ForceColdStart={self.cold_start_counter}" ) - self.update_function(function, code_package) + self.update_function(function, code_package, False, "") def enforce_cold_start(self, functions: List[Function], code_package: Benchmark): self.cold_start_counter += 1 diff --git a/sebs/benchmark.py b/sebs/benchmark.py index 71a54b91..331e8db4 100644 --- a/sebs/benchmark.py +++ b/sebs/benchmark.py @@ -5,7 +5,7 @@ import shutil import subprocess from abc import abstractmethod -from typing import Any, Callable, Dict, List, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple import docker @@ -124,6 +124,11 @@ def is_cached_valid(self, val: bool): def code_size(self): return self._code_size + @property + def container_uri(self) -> str: + assert self._container_uri is not None + return self._container_uri + @property def language(self) -> "Language": return self._language @@ -140,6 +145,10 @@ def language_version(self): def architecture(self) -> str: return self._architecture + @property + def container_deployment(self): + return self._container_deployment + @property # noqa: A003 def hash(self): path = os.path.join(self.benchmark_path, self.language_name) @@ -170,7 +179,7 @@ def __init__( self._language = config.runtime.language self._language_version = config.runtime.version self._architecture = self._experiment_config.architecture - + self._container_deployment = config.container_deployment self._benchmark_path = find_benchmark(self.benchmark, "benchmarks") if not self._benchmark_path: raise RuntimeError("Benchmark {benchmark} not found!".format(benchmark=self._benchmark)) @@ -192,7 +201,9 @@ def __init__( self._language.value, self._language_version, self._architecture, + "container" if self._container_deployment else "package", ) + self._container_uri: Optional[str] = None # verify existence of function in cache self.query_cache() @@ -233,13 +244,26 @@ def serialize(self) -> dict: return {"size": self.code_size, "hash": self.hash} def query_cache(self): - self._code_package = self._cache_client.get_code_package( - deployment=self._deployment_name, - benchmark=self._benchmark, - language=self.language_name, - language_version=self.language_version, - architecture=self.architecture, - ) + + if self.container_deployment: + self._code_package = self._cache_client.get_container( + deployment=self._deployment_name, + benchmark=self._benchmark, + language=self.language_name, + language_version=self.language_version, + architecture=self.architecture, + ) + if self._code_package is not None: + self._container_uri = self._code_package["image-uri"] + else: + self._code_package = self._cache_client.get_code_package( + deployment=self._deployment_name, + benchmark=self._benchmark, + language=self.language_name, + language_version=self.language_version, + architecture=self.architecture, + ) + self._functions = self._cache_client.get_functions( deployment=self._deployment_name, benchmark=self._benchmark, @@ -486,15 +510,21 @@ def recalculate_code_size(self): return self._code_size def build( - self, deployment_build_step: Callable[[str, str, str, str, bool], Tuple[str, int]] - ) -> Tuple[bool, str]: + self, + deployment_build_step: Callable[ + [str, str, str, str, str, bool, bool], Tuple[str, int, str] + ], + ) -> Tuple[bool, str, bool, str]: # Skip build if files are up to date and user didn't enforce rebuild if self.is_cached and self.is_cached_valid: self.logging.info( "Using cached benchmark {} at {}".format(self.benchmark, self.code_location) ) - return False, self.code_location + if self.container_deployment: + return False, self.code_location, self.container_deployment, self.container_uri + + return False, self.code_location, self.container_deployment, "" msg = ( "no cached code package." @@ -515,12 +545,15 @@ def build( self.add_deployment_files(self._output_dir) self.add_deployment_package(self._output_dir) self.install_dependencies(self._output_dir) - self._code_location, self._code_size = deployment_build_step( + + self._code_location, self._code_size, self._container_uri = deployment_build_step( os.path.abspath(self._output_dir), self.language_name, self.language_version, + self.architecture, self.benchmark, self.is_cached_valid, + self.container_deployment, ) self.logging.info( ( @@ -534,14 +567,13 @@ def build( ) ) - # package already exists if self.is_cached: self._cache_client.update_code_package(self._deployment_name, self) else: self._cache_client.add_code_package(self._deployment_name, self) self.query_cache() - return True, self._code_location + return True, self._code_location, self._container_deployment, self._container_uri """ Locates benchmark input generator, inspect how many storage buckets diff --git a/sebs/cache.py b/sebs/cache.py index be1464b7..e5a6744f 100644 --- a/sebs/cache.py +++ b/sebs/cache.py @@ -1,5 +1,6 @@ # https://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth import collections.abc +import docker import datetime import json import os @@ -42,8 +43,9 @@ class Cache(LoggingBase): """ config_updated = False - def __init__(self, cache_dir: str): + def __init__(self, cache_dir: str, docker_client: docker.DockerClient): super().__init__() + self.docker_client = docker_client self.cache_dir = os.path.abspath(cache_dir) self.ignore_functions: bool = False self.ignore_storage: bool = False @@ -122,7 +124,7 @@ def get_benchmark_config(self, deployment: str, benchmark: str): def get_code_package( self, deployment: str, benchmark: str, language: str, - language_version: str, architecture: str, + language_version: str, architecture: str ) -> Optional[Dict[str, Any]]: cfg = self.get_benchmark_config(deployment, benchmark) @@ -132,6 +134,18 @@ def get_code_package( else: return None + def get_container( + self, deployment: str, benchmark: str, language: str, + language_version: str, architecture: str + ) -> Optional[Dict[str, Any]]: + cfg = self.get_benchmark_config(deployment, benchmark) + + key = f"{language_version}-{architecture}" + if cfg and language in cfg and key in cfg[language]["containers"]: + return cfg[language]["containers"][key] + else: + return None + def get_functions( self, deployment: str, benchmark: str, language: str ) -> Optional[Dict[str, Any]]: @@ -165,7 +179,11 @@ def update_storage(self, deployment: str, benchmark: str, config: dict): with open(os.path.join(benchmark_dir, "config.json"), "w") as fp: json.dump(cached_config, fp, indent=2) - def add_code_package(self, deployment_name: str, code_package: "Benchmark"): + def add_code_package( + self, + deployment_name: str, + code_package: "Benchmark", + ): with self._lock: language = code_package.language_name language_version = code_package.language_version @@ -173,10 +191,14 @@ def add_code_package(self, deployment_name: str, code_package: "Benchmark"): benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) os.makedirs(benchmark_dir, exist_ok=True) + + package_type = "docker" if code_package.container_deployment else "package" # Check if cache directory for this deployment exist cached_dir = os.path.join( - benchmark_dir, deployment_name, language, language_version, architecture + benchmark_dir, deployment_name, language, + language_version, architecture, package_type ) + if not os.path.exists(cached_dir): os.makedirs(cached_dir, exist_ok=True) @@ -193,22 +215,43 @@ def add_code_package(self, deployment_name: str, code_package: "Benchmark"): # don't store absolute path to avoid problems with moving cache dir relative_cached_loc = os.path.relpath(cached_location, self.cache_dir) language_config["location"] = relative_cached_loc + date = str(datetime.datetime.now()) language_config["date"] = { "created": date, "modified": date, } - # config = {deployment_name: {language: language_config}} - config = { - deployment_name: { - language: { - "code_package": { - f"{language_version}-{architecture}": language_config - }, - "functions": {}, + + key = f"{language_version}-{architecture}" + if code_package.container_deployment: + + image = self.docker_client.images.get(code_package.container_uri) + language_config["image-uri"] = code_package.container_uri + language_config["image-id"] = image.id + + config = { + deployment_name: { + language: { + "containers": { + key: language_config + }, + "code_package": {}, + "functions": {}, + } + } + } + else: + config = { + deployment_name: { + language: { + "code_package": { + key: language_config + }, + "containers": {}, + "functions": {}, + } } } - } # make sure to not replace other entries if os.path.exists(os.path.join(benchmark_dir, "config.json")): @@ -217,9 +260,16 @@ def add_code_package(self, deployment_name: str, code_package: "Benchmark"): if deployment_name in cached_config: # language known, platform known, extend dictionary if language in cached_config[deployment_name]: - cached_config[deployment_name][language]["code_package"][ - f"{language_version}-{architecture}" - ] = language_config + + if code_package.container_deployment: + cached_config[deployment_name][language]["containers"][ + key + ] = language_config + else: + cached_config[deployment_name][language]["code_package"][ + key + ] = language_config + # language unknown, platform known - add new dictionary else: cached_config[deployment_name][language] = config[deployment_name][ @@ -240,17 +290,23 @@ def add_code_package(self, deployment_name: str, code_package: "Benchmark"): ) def update_code_package( - self, deployment_name: str, code_package: "Benchmark" + self, + deployment_name: str, + code_package: "Benchmark", ): with self._lock: language = code_package.language_name language_version = code_package.language_version architecture = code_package.architecture benchmark_dir = os.path.join(self.cache_dir, code_package.benchmark) + + package_type = "docker" if code_package.container_deployment else "package" # Check if cache directory for this deployment exist cached_dir = os.path.join( - benchmark_dir, deployment_name, language, language_version, architecture + benchmark_dir, deployment_name, language, + language_version, architecture, package_type ) + if os.path.exists(cached_dir): # copy code @@ -270,16 +326,33 @@ def update_code_package( with open(os.path.join(benchmark_dir, "config.json"), "r") as fp: config = json.load(fp) date = str(datetime.datetime.now()) + key = f"{language_version}-{architecture}" - config[deployment_name][language]["code_package"][key]["date"][ + if code_package.container_deployment: + main_key = "containers" + else: + main_key = "code_package" + + config[deployment_name][language][main_key][key]["date"][ "modified" ] = date - config[deployment_name][language]["code_package"][key][ + config[deployment_name][language][main_key][key][ "hash" ] = code_package.hash - config[deployment_name][language]["code_package"][key][ + config[deployment_name][language][main_key][key][ "size" ] = code_package.code_size + + if code_package.container_deployment: + + image = self.docker_client.images.get(code_package.container_uri) + config[deployment_name][language][main_key][key][ + "image-id" + ] = image.id + config[deployment_name][language][main_key][key][ + "image-uri" + ] = code_package.container_uri + with open(os.path.join(benchmark_dir, "config.json"), "w") as fp: json.dump(config, fp, indent=2) else: diff --git a/sebs/config.py b/sebs/config.py index dbe9a645..3a34ebb8 100644 --- a/sebs/config.py +++ b/sebs/config.py @@ -34,16 +34,28 @@ def deployment_files(self, deployment_name: str, language_name: str) -> List[str def docker_image_types(self, deployment_name: str, language_name: str) -> List[str]: return self._system_config[deployment_name]["languages"][language_name]["images"] - def supported_language_versions(self, deployment_name: str, language_name: str) -> List[str]: - return self._system_config[deployment_name]["languages"][language_name][ - "base_images" + def supported_language_versions( + self, deployment_name: str, language_name: str, architecture: str + ) -> List[str]: + return self._system_config[deployment_name]["languages"][language_name]["base_images"][ + architecture ].keys() def supported_architecture(self, deployment_name: str) -> List[str]: return self._system_config[deployment_name]["architecture"] - def benchmark_base_images(self, deployment_name: str, language_name: str) -> Dict[str, str]: - return self._system_config[deployment_name]["languages"][language_name]["base_images"] + def supported_package_deployment(self, deployment_name: str) -> bool: + return "package" in self._system_config[deployment_name]["deployments"] + + def supported_container_deployment(self, deployment_name: str) -> bool: + return "container" in self._system_config[deployment_name]["deployments"] + + def benchmark_base_images( + self, deployment_name: str, language_name: str, architecture: str + ) -> Dict[str, str]: + return self._system_config[deployment_name]["languages"][language_name]["base_images"][ + architecture + ] def benchmark_image_name( self, @@ -51,10 +63,13 @@ def benchmark_image_name( benchmark: str, language_name: str, language_version: str, + architecture: str, registry: Optional[str] = None, ) -> str: - tag = self.benchmark_image_tag(system, benchmark, language_name, language_version) + tag = self.benchmark_image_tag( + system, benchmark, language_name, language_version, architecture + ) repo_name = self.docker_repository() if registry is not None: return f"{registry}/{repo_name}:{tag}" @@ -62,9 +77,14 @@ def benchmark_image_name( return f"{repo_name}:{tag}" def benchmark_image_tag( - self, system: str, benchmark: str, language_name: str, language_version: str + self, + system: str, + benchmark: str, + language_name: str, + language_version: str, + architecture: str, ) -> str: - tag = f"function.{system}.{benchmark}.{language_name}-{language_version}" + tag = f"function.{system}.{benchmark}.{language_name}-{language_version}-{architecture}" if self.image_tag_prefix: tag = f"{tag}-{self.image_tag_prefix}" return tag diff --git a/sebs/experiments/config.py b/sebs/experiments/config.py index a3bfb96b..26aea9f2 100644 --- a/sebs/experiments/config.py +++ b/sebs/experiments/config.py @@ -7,6 +7,7 @@ class Config: def __init__(self): self._update_code: bool = False self._update_storage: bool = False + self._container_deployment: bool = False self._download_results: bool = False self._architecture: str = "x64" self._flags: Dict[str, bool] = {} @@ -36,6 +37,10 @@ def runtime(self) -> Runtime: def architecture(self) -> str: return self._architecture + @property + def container_deployment(self) -> bool: + return self._container_deployment + def experiment_settings(self, name: str) -> dict: return self._experiment_configs[name] @@ -48,6 +53,7 @@ def serialize(self) -> dict: "flags": self._flags, "experiments": self._experiment_configs, "architecture": self._architecture, + "container_deployment": self._container_deployment, } return out @@ -59,6 +65,7 @@ def deserialize(config: dict) -> "Config": cfg._update_code = config["update_code"] cfg._update_storage = config["update_storage"] cfg._download_results = config["download_results"] + cfg._container_deployment = config["container_deployment"] cfg._runtime = Runtime.deserialize(config["runtime"]) cfg._flags = config["flags"] if "flags" in config else {} cfg._architecture = config["architecture"] diff --git a/sebs/experiments/invocation_overhead.py b/sebs/experiments/invocation_overhead.py index 76f9a41a..335b184f 100644 --- a/sebs/experiments/invocation_overhead.py +++ b/sebs/experiments/invocation_overhead.py @@ -41,7 +41,7 @@ def before_sample(self, size: int, input_benchmark: dict): arr = bytearray((random.getrandbits(8) for i in range(size))) self._benchmark.code_package_modify("randomdata.bin", bytes(arr)) function = self._deployment_client.get_function(self._benchmark) - self._deployment_client.update_function(function, self._benchmark) + self._deployment_client.update_function(function, self._benchmark, False, "") class PayloadSize: diff --git a/sebs/faas/container.py b/sebs/faas/container.py new file mode 100644 index 00000000..7ae9c4d3 --- /dev/null +++ b/sebs/faas/container.py @@ -0,0 +1,211 @@ +from abc import abstractmethod +import docker +import json +import platform +import os +import shutil +from typing import Tuple + +from rich.progress import Progress + +from sebs.config import SeBSConfig +from sebs.utils import LoggingBase, execute, DOCKER_DIR + + +class DockerContainer(LoggingBase): + @staticmethod + @abstractmethod + def name() -> str: + pass + + @property + def disable_rich_output(self) -> bool: + return self._disable_rich_output + + @disable_rich_output.setter + def disable_rich_output(self, val: bool): + self._disable_rich_output = val + + def __init__( + self, + system_config: SeBSConfig, + docker_client, + experimental_manifest: bool = False, + ): + super().__init__() + + self.docker_client = docker_client + self.experimental_manifest = experimental_manifest + self.system_config = system_config + self._disable_rich_output = False + + def find_image(self, repository_name, image_tag) -> bool: + + if self.experimental_manifest: + try: + # This requires enabling experimental Docker features + # Furthermore, it's not yet supported in the Python library + execute(f"docker manifest inspect {repository_name}:{image_tag}") + return True + except RuntimeError: + return False + else: + try: + # default version requires pulling for an image + self.docker_client.images.pull(repository=repository_name, tag=image_tag) + return True + except docker.errors.NotFound: + return False + + def show_progress(self, txt: str, progress: Progress, layer_tasks: dict): + + if isinstance(txt, str): + line = json.loads(txt) + else: + line = txt + + status = line.get("status", "") + progress_detail = line.get("progressDetail", {}) + id_ = line.get("id", "") + + if "Pushing" in status and progress_detail: + current = progress_detail.get("current", 0) + total = progress_detail.get("total", 0) + + if id_ not in layer_tasks and total > 0: + # Create new progress task for this layer + description = f"Layer {id_[:12]}" + layer_tasks[id_] = progress.add_task(description, total=total) + if id_ in layer_tasks: + # Update progress for existing task + progress.update(layer_tasks[id_], completed=current) + + elif any(x in status for x in ["Layer already exists", "Pushed"]): + if id_ in layer_tasks: + # Complete the task + progress.update(layer_tasks[id_], completed=progress.tasks[layer_tasks[id_]].total) + + elif "error" in line: + raise Exception(line["error"]) + + def push_image(self, repository_uri, image_tag): + try: + + if not self.disable_rich_output: + + layer_tasks = {} + with Progress() as progress: + + self.logging.info(f"Pushing image {image_tag} to {repository_uri}") + ret = self.docker_client.images.push( + repository=repository_uri, tag=image_tag, stream=True, decode=True + ) + for line in ret: + self.show_progress(line, progress, layer_tasks) + + else: + self.logging.info(f"Pushing image {image_tag} to {repository_uri}") + ret = self.docker_client.images.push( + repository=repository_uri, tag=image_tag, stream=True, decode=True + ) + + for val in ret: + if "error" in val: + self.logging.error(f"Failed to push the image to registry {repository_uri}") + raise RuntimeError(val) + + except docker.errors.APIError as e: + self.logging.error( + f"Failed to push the image to registry {repository_uri}. Error: {str(e)}" + ) + raise e + + @abstractmethod + def registry_name( + self, benchmark: str, language_name: str, language_version: str, architecture: str + ) -> Tuple[str, str, str, str]: + pass + + def build_base_image( + self, + directory: str, + language_name: str, + language_version: str, + architecture: str, + benchmark: str, + is_cached: bool, + ) -> Tuple[bool, str]: + """ + When building function for the first time (according to SeBS cache), + check if Docker image is available in the registry. + If yes, then skip building. + If no, then continue building. + + For every subsequent build, we rebuild image and push it to the + registry. These are triggered by users modifying code and enforcing + a build. + """ + + registry_name, repository_name, image_tag, image_uri = self.registry_name( + benchmark, language_name, language_version, architecture + ) + + # cached package, rebuild not enforced -> check for new one + # if cached is true, no need to build and push the image. + if is_cached: + if self.find_image(repository_name, image_tag): + self.logging.info( + f"Skipping building Docker image for {benchmark}, using " + f"Docker image {image_uri} from registry: {registry_name}." + ) + return False, image_uri + else: + # image doesn't exist, let's continue + self.logging.info( + f"Image {image_uri} doesn't exist in the registry, " + f"building the image for {benchmark}." + ) + + build_dir = os.path.join(directory, "build") + os.makedirs(build_dir, exist_ok=True) + + shutil.copy( + os.path.join(DOCKER_DIR, self.name(), language_name, "Dockerfile.function"), + os.path.join(build_dir, "Dockerfile"), + ) + for fn in os.listdir(directory): + if fn not in ("index.js", "__main__.py"): + file = os.path.join(directory, fn) + shutil.move(file, build_dir) + + with open(os.path.join(build_dir, ".dockerignore"), "w") as f: + f.write("Dockerfile") + + builder_image = self.system_config.benchmark_base_images( + self.name(), language_name, architecture + )[language_version] + self.logging.info(f"Build the benchmark base image {repository_name}:{image_tag}.") + + isa = platform.processor() + if (isa == "x86_64" and architecture != "x64") or ( + isa == "arm64" and architecture != "arm64" + ): + self.logging.warning( + f"Building image for architecture: {architecture} on CPU architecture: {isa}. " + "This step requires configured emulation. If the build fails, please consult " + "our documentation. We recommend QEMU as it can be configured to run automatically." + ) + + buildargs = {"VERSION": language_version, "BASE_IMAGE": builder_image} + image, _ = self.docker_client.images.build( + tag=image_uri, path=build_dir, buildargs=buildargs + ) + + self.logging.info( + f"Push the benchmark base image {repository_name}:{image_tag} " + f"to registry: {registry_name}." + ) + + self.push_image(image_uri, image_tag) + + return True, image_uri diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 2576a0ef..ed2d98bd 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -154,9 +154,21 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: This step allows us to change the structure above to fit different deployment requirements, Example: a zip file for AWS or a specific - directory structure for Azure. - :return: path to packaged code and its size + Args: + directory: Path to the code directory + language_name: Programming language name + language_version: Programming language version + architecture: Target architecture (e.g., 'x64', 'arm64') + benchmark: Benchmark name + is_cached: Whether the code is cached + container_deployment: Whether to package for container deployment + + Returns: + Tuple containing: + - Path to packaged code + - Size of the package + - Container URI """ @abstractmethod @@ -165,13 +177,39 @@ def package_code( directory: str, language_name: str, language_version: str, + architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + container_deployment: bool, + ) -> Tuple[str, int, str]: pass @abstractmethod - def create_function(self, code_package: Benchmark, func_name: str) -> Function: + def create_function( + self, + code_package: Benchmark, + func_name: str, + container_deployment: bool, + container_uri: str, + ) -> Function: + + """ + Create a new function in the FaaS platform. + The implementation is responsible for creating all necessary + cloud resources. + + Args: + code_package: Benchmark containing the function code + func_name: Name of the function + container_deployment: Whether to deploy as a container + container_uri: URI of the container image + + Returns: + Function: Created function instance + + Raises: + NotImplementedError: If container deployment is requested but not supported + """ pass @abstractmethod @@ -179,7 +217,25 @@ def cached_function(self, function: Function): pass @abstractmethod - def update_function(self, function: Function, code_package: Benchmark): + def update_function( + self, + function: Function, + code_package: Benchmark, + container_deployment: bool, + container_uri: str, + ): + """ + Update an existing function in the FaaS platform. + + Args: + function: Existing function instance to update + code_package: New benchmark containing the function code + container_deployment: Whether to deploy as a container + container_uri: URI of the container image + + Raises: + NotImplementedError: If container deployment is requested but not supported + """ pass """ @@ -198,7 +254,7 @@ def update_function(self, function: Function, code_package: Benchmark): def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) -> Function: if code_package.language_version not in self.system_config.supported_language_versions( - self.name(), code_package.language_name + self.name(), code_package.language_name, code_package.architecture ): raise Exception( "Unsupported {language} version {version} in {system}!".format( @@ -210,7 +266,7 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) if not func_name: func_name = self.default_function_name(code_package) - rebuilt, _ = code_package.build(self.package_code) + rebuilt, _, container_deployment, container_uri = code_package.build(self.package_code) """ There's no function with that name? @@ -227,7 +283,9 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) else "function {} not found in cache.".format(func_name) ) self.logging.info("Creating new function! Reason: " + msg) - function = self.create_function(code_package, func_name) + function = self.create_function( + code_package, func_name, container_deployment, container_uri + ) self.cache_client.add_function( deployment_name=self.name(), language_name=code_package.language_name, @@ -259,7 +317,7 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) f"Enforcing rebuild and update of of cached function " f"{func_name} with hash {function.code_package_hash}." ) - self.update_function(function, code_package) + self.update_function(function, code_package, container_deployment, container_uri) function.code_package_hash = code_package.hash function.updated_code = True self.cache_client.add_function( @@ -340,6 +398,9 @@ def download_metrics( def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: pass + def disable_rich_output(self): + pass + # @abstractmethod # def get_invocation_error(self, function_name: str, # start_time: int, end_time: int): diff --git a/sebs/gcp/gcp.py b/sebs/gcp/gcp.py index 71ff3185..e1ddbc50 100644 --- a/sebs/gcp/gcp.py +++ b/sebs/gcp/gcp.py @@ -138,9 +138,16 @@ def package_code( directory: str, language_name: str, language_version: str, + architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + container_deployment: bool, + ) -> Tuple[str, int, str]: + + container_uri = "" + + if container_deployment: + raise NotImplementedError("Container Deployment is not supported in GCP") CONFIG_FILES = { "python": ["handler.py", ".python_packages"], @@ -190,9 +197,18 @@ def package_code( # rename the main.py back to handler.py shutil.move(new_path, old_path) - return os.path.join(directory, "{}.zip".format(benchmark)), bytes_size + return os.path.join(directory, "{}.zip".format(benchmark)), bytes_size, container_uri + + def create_function( + self, + code_package: Benchmark, + func_name: str, + container_deployment: bool, + container_uri: str, + ) -> "GCPFunction": - def create_function(self, code_package: Benchmark, func_name: str) -> "GCPFunction": + if container_deployment: + raise NotImplementedError("Container deployment is not supported in GCP") package = code_package.code_location benchmark = code_package.benchmark @@ -274,7 +290,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "GCPFuncti cfg=function_cfg, bucket=code_bucket, ) - self.update_function(function, code_package) + self.update_function(function, code_package, container_deployment, container_uri) # Add LibraryTrigger to a new function from sebs.gcp.triggers import LibraryTrigger @@ -326,7 +342,16 @@ def cached_function(self, function: Function): gcp_trigger.logging_handlers = self.logging_handlers gcp_trigger.deployment_client = self - def update_function(self, function: Function, code_package: Benchmark): + def update_function( + self, + function: Function, + code_package: Benchmark, + container_deployment: bool, + container_uri: str, + ): + + if container_deployment: + raise NotImplementedError("Container deployment is not supported in GCP") function = cast(GCPFunction, function) language_runtime = code_package.language_version diff --git a/sebs/local/local.py b/sebs/local/local.py index c24e1b10..9549b792 100644 --- a/sebs/local/local.py +++ b/sebs/local/local.py @@ -129,9 +129,11 @@ def package_code( directory: str, language_name: str, language_version: str, + architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + container_deployment: bool, + ) -> Tuple[str, int, str]: CONFIG_FILES = { "python": ["handler.py", "requirements.txt", ".python_packages"], @@ -150,9 +152,18 @@ def package_code( mbytes = bytes_size / 1024.0 / 1024.0 self.logging.info("Function size {:2f} MB".format(mbytes)) - return directory, bytes_size + return directory, bytes_size, "" - def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunction": + def create_function( + self, + code_package: Benchmark, + func_name: str, + container_deployment: bool, + container_uri: str, + ) -> "LocalFunction": + + if container_deployment: + raise NotImplementedError("Container deployment is not supported in Local") container_name = "{}:run.local.{}.{}".format( self._system_config.docker_repository(), @@ -281,7 +292,13 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc FIXME: restart Docker? """ - def update_function(self, function: Function, code_package: Benchmark): + def update_function( + self, + function: Function, + code_package: Benchmark, + container_deployment: bool, + container_uri: str, + ): pass """ diff --git a/sebs/openwhisk/container.py b/sebs/openwhisk/container.py new file mode 100644 index 00000000..2dd27717 --- /dev/null +++ b/sebs/openwhisk/container.py @@ -0,0 +1,46 @@ +import docker +from typing import Tuple + +from sebs.faas.container import DockerContainer +from sebs.config import SeBSConfig +from sebs.openwhisk.config import OpenWhiskConfig + + +class OpenWhiskContainer(DockerContainer): + @staticmethod + def name() -> str: + return "openwhisk" + + @staticmethod + def typename() -> str: + return "OpenWhisk.Container" + + def __init__( + self, + system_config: SeBSConfig, + config: OpenWhiskConfig, + docker_client: docker.client, + experimental_manifest: bool, + ): + super().__init__(system_config, docker_client, experimental_manifest) + self.config = config + + def registry_name( + self, benchmark: str, language_name: str, language_version: str, architecture: str + ) -> Tuple[str, str, str, str]: + + registry_name = self.config.resources.docker_registry + + # We need to retag created images when pushing to registry other + # than default + repository_name = self.system_config.docker_repository() + image_tag = self.system_config.benchmark_image_tag( + self.name(), benchmark, language_name, language_version, architecture + ) + if registry_name is not None and registry_name != "": + repository_name = f"{registry_name}/{repository_name}" + else: + registry_name = "Docker Hub" + image_uri = f"{repository_name}:{image_tag}" + + return registry_name, repository_name, image_tag, image_uri diff --git a/sebs/openwhisk/openwhisk.py b/sebs/openwhisk/openwhisk.py index 14192cf6..a3bc5ab6 100644 --- a/sebs/openwhisk/openwhisk.py +++ b/sebs/openwhisk/openwhisk.py @@ -1,5 +1,4 @@ import os -import shutil import subprocess from typing import cast, Dict, List, Optional, Tuple, Type @@ -9,9 +8,10 @@ from sebs.cache import Cache from sebs.faas import System, PersistentStorage from sebs.faas.function import Function, ExecutionResult, Trigger +from sebs.openwhisk.container import OpenWhiskContainer from sebs.openwhisk.storage import Minio from sebs.openwhisk.triggers import LibraryTrigger, HTTPTrigger -from sebs.utils import DOCKER_DIR, LoggingHandlers, execute +from sebs.utils import LoggingHandlers from .config import OpenWhiskConfig from .function import OpenWhiskFunction, OpenWhiskFunctionConfig from ..config import SeBSConfig @@ -32,6 +32,10 @@ def __init__( self._config = config self.logging_handlers = logger_handlers + self.container_client = OpenWhiskContainer( + self.system_config, self.config, self.docker_client, self.config.experimentalManifest + ) + if self.config.resources.docker_username: if self.config.resources.docker_registry: docker_client.login( @@ -94,125 +98,22 @@ def get_wsk_cmd(self) -> List[str]: cmd.append("-i") return cmd - def find_image(self, repository_name, image_tag) -> bool: - - if self.config.experimentalManifest: - try: - # This requires enabling experimental Docker features - # Furthermore, it's not yet supported in the Python library - execute(f"docker manifest inspect {repository_name}:{image_tag}") - return True - except RuntimeError: - return False - else: - try: - # default version requires pulling for an image - self.docker_client.images.pull(repository=repository_name, tag=image_tag) - return True - except docker.errors.NotFound: - return False - - def build_base_image( - self, - directory: str, - language_name: str, - language_version: str, - benchmark: str, - is_cached: bool, - ) -> bool: - """ - When building function for the first time (according to SeBS cache), - check if Docker image is available in the registry. - If yes, then skip building. - If no, then continue building. - - For every subsequent build, we rebuild image and push it to the - registry. These are triggered by users modifying code and enforcing - a build. - """ - - # We need to retag created images when pushing to registry other - # than default - registry_name = self.config.resources.docker_registry - repository_name = self.system_config.docker_repository() - image_tag = self.system_config.benchmark_image_tag( - self.name(), benchmark, language_name, language_version - ) - if registry_name is not None and registry_name != "": - repository_name = f"{registry_name}/{repository_name}" - else: - registry_name = "Docker Hub" - - # Check if the image is already in the registry. - # cached package, rebuild not enforced -> check for new one - if is_cached: - if self.find_image(repository_name, image_tag): - self.logging.info( - f"Skipping building OpenWhisk Docker package for {benchmark}, using " - f"Docker image {repository_name}:{image_tag} from registry: " - f"{registry_name}." - ) - return False - else: - # image doesn't exist, let's continue - self.logging.info( - f"Image {repository_name}:{image_tag} doesn't exist in the registry, " - f"building OpenWhisk package for {benchmark}." - ) - - build_dir = os.path.join(directory, "docker") - os.makedirs(build_dir, exist_ok=True) - shutil.copy( - os.path.join(DOCKER_DIR, self.name(), language_name, "Dockerfile.function"), - os.path.join(build_dir, "Dockerfile"), - ) - - for fn in os.listdir(directory): - if fn not in ("index.js", "__main__.py"): - file = os.path.join(directory, fn) - shutil.move(file, build_dir) - - with open(os.path.join(build_dir, ".dockerignore"), "w") as f: - f.write("Dockerfile") - - builder_image = self.system_config.benchmark_base_images(self.name(), language_name)[ - language_version - ] - self.logging.info(f"Build the benchmark base image {repository_name}:{image_tag}.") - - buildargs = {"VERSION": language_version, "BASE_IMAGE": builder_image} - image, _ = self.docker_client.images.build( - tag=f"{repository_name}:{image_tag}", path=build_dir, buildargs=buildargs - ) - - # Now push the image to the registry - # image will be located in a private repository - self.logging.info( - f"Push the benchmark base image {repository_name}:{image_tag} " - f"to registry: {registry_name}." - ) - ret = self.docker_client.images.push( - repository=repository_name, tag=image_tag, stream=True, decode=True - ) - # doesn't raise an exception for some reason - for val in ret: - if "error" in val: - self.logging.error(f"Failed to push the image to registry {registry_name}") - raise RuntimeError(val) - return True - def package_code( self, directory: str, language_name: str, language_version: str, + architecture: str, benchmark: str, is_cached: bool, - ) -> Tuple[str, int]: + container_deployment: bool, + ) -> Tuple[str, int, str]: # Regardless of Docker image status, we need to create .zip file # to allow registration of function with OpenWhisk - self.build_base_image(directory, language_name, language_version, benchmark, is_cached) + _, image_uri = self.container_client.build_base_image( + directory, language_name, language_version, architecture, benchmark, is_cached + ) # We deploy Minio config in code package since this depends on local # deployment - it cannnot be a part of Docker image @@ -229,7 +130,7 @@ def package_code( self.logging.info(f"Created {benchmark_archive} archive") bytes_size = os.path.getsize(benchmark_archive) self.logging.info("Zip archive size {:2f} MB".format(bytes_size / 1024.0 / 1024.0)) - return benchmark_archive, bytes_size + return benchmark_archive, bytes_size, image_uri def storage_arguments(self) -> List[str]: storage = cast(Minio, self.get_storage()) @@ -245,7 +146,13 @@ def storage_arguments(self) -> List[str]: storage.config.address, ] - def create_function(self, code_package: Benchmark, func_name: str) -> "OpenWhiskFunction": + def create_function( + self, + code_package: Benchmark, + func_name: str, + container_deployment: bool, + container_uri: str, + ) -> "OpenWhiskFunction": self.logging.info("Creating function as an action in OpenWhisk.") try: actions = subprocess.run( @@ -270,7 +177,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "OpenWhisk ) # Update function - we don't know what version is stored self.logging.info(f"Retrieved existing OpenWhisk action {func_name}.") - self.update_function(res, code_package) + self.update_function(res, code_package, container_deployment, container_uri) else: try: self.logging.info(f"Creating new OpenWhisk action {func_name}") @@ -279,6 +186,7 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "OpenWhisk code_package.benchmark, code_package.language_name, code_package.language_version, + code_package.architecture, ) subprocess.run( [ @@ -321,7 +229,13 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "OpenWhisk return res - def update_function(self, function: Function, code_package: Benchmark): + def update_function( + self, + function: Function, + code_package: Benchmark, + container_deployment: bool, + container_uri: str, + ): self.logging.info(f"Update an existing OpenWhisk action {function.name}.") function = cast(OpenWhiskFunction, function) docker_image = self.system_config.benchmark_image_name( @@ -329,6 +243,7 @@ def update_function(self, function: Function, code_package: Benchmark): code_package.benchmark, code_package.language_name, code_package.language_version, + code_package.architecture, ) try: subprocess.run( @@ -457,3 +372,6 @@ def cached_function(self, function: Function): cast(LibraryTrigger, trigger).wsk_cmd = self.get_wsk_cmd() for trigger in function.triggers(Trigger.TriggerType.HTTP): trigger.logging_handlers = self.logging_handlers + + def disable_rich_output(self): + self.container_client.disable_rich_output = True diff --git a/sebs/regression.py b/sebs/regression.py index e83afa7d..03920538 100644 --- a/sebs/regression.py +++ b/sebs/regression.py @@ -28,34 +28,51 @@ benchmarks_nodejs = ["110.dynamic-html", "120.uploader", "210.thumbnailer"] architectures_aws = ["x64", "arm64"] +deployments_aws = ["package", "container"] + architectures_gcp = ["x64"] +deployments_gcp = ["package"] + architectures_azure = ["x64"] +deployments_azure = ["package"] + architectures_openwhisk = ["x64"] +deployments_openwhisk = ["container"] # user-defined config passed during initialization cloud_config: Optional[dict] = None class TestSequenceMeta(type): - def __init__(cls, name, bases, attrs, benchmarks, architectures, deployment_name, triggers): + def __init__( + cls, name, bases, attrs, benchmarks, architectures, + deployments, deployment_name, triggers + ): type.__init__(cls, name, bases, attrs) cls.deployment_name = deployment_name cls.triggers = triggers - def __new__(mcs, name, bases, dict, benchmarks, architectures, deployment_name, triggers): - def gen_test(benchmark_name, architecture): + def __new__( + mcs, name, bases, dict, benchmarks, architectures, + deployments, deployment_name, triggers + ): + def gen_test(benchmark_name, architecture, deployment_type): def test(self): - log_name = f"Regression-{deployment_name}-{benchmark_name}" + log_name = f"Regression-{deployment_name}-{benchmark_name}-{deployment_type}" logger = logging.getLogger(log_name) logger.setLevel(logging.INFO) logging_wrapper = ColoredWrapper(log_name, logger) self.experiment_config["architecture"] = architecture + self.experiment_config["container_deployment"] = deployment_type == "container" deployment_client = self.get_deployment(benchmark_name, architecture) + deployment_client.disable_rich_output() + logging_wrapper.info( - f"Begin regression test of {benchmark_name} on {deployment_client.name()}." + f"Begin regression test of {benchmark_name} on {deployment_client.name()}. " + f"Architecture {architecture}, deployment type: {deployment_type}." ) experiment_config = self.client.get_experiment_config(self.experiment_config) @@ -109,9 +126,12 @@ def test(self): for architecture in architectures: - # for trigger in triggers: - test_name = f"test_{deployment_name}_{benchmark}_{architecture}" - dict[test_name] = gen_test(benchmark, architecture) + for deployment_type in deployments: + + # for trigger in triggers: + test_name = f"test_{deployment_name}_{benchmark}" + test_name += f"_{architecture}_{deployment_type}" + dict[test_name] = gen_test(benchmark, architecture, deployment_type) dict["lock"] = threading.Lock() dict["cfg"] = None @@ -123,6 +143,7 @@ class AWSTestSequencePython( metaclass=TestSequenceMeta, benchmarks=benchmarks_python, architectures=architectures_aws, + deployments=deployments_aws, deployment_name="aws", triggers=[Trigger.TriggerType.LIBRARY, Trigger.TriggerType.HTTP], ): @@ -151,6 +172,7 @@ class AWSTestSequenceNodejs( metaclass=TestSequenceMeta, benchmarks=benchmarks_nodejs, architectures=architectures_aws, + deployments=deployments_aws, deployment_name="aws", triggers=[Trigger.TriggerType.LIBRARY, Trigger.TriggerType.HTTP], ): @@ -174,6 +196,7 @@ class AzureTestSequencePython( metaclass=TestSequenceMeta, benchmarks=benchmarks_python, architectures=architectures_azure, + deployments=deployments_azure, deployment_name="azure", triggers=[Trigger.TriggerType.HTTP], ): @@ -214,6 +237,7 @@ class AzureTestSequenceNodejs( metaclass=TestSequenceMeta, benchmarks=benchmarks_nodejs, architectures=architectures_azure, + deployments=deployments_azure, deployment_name="azure", triggers=[Trigger.TriggerType.HTTP], ): @@ -250,6 +274,7 @@ class GCPTestSequencePython( metaclass=TestSequenceMeta, benchmarks=benchmarks_python, architectures=architectures_gcp, + deployments=deployments_gcp, deployment_name="gcp", triggers=[Trigger.TriggerType.HTTP], ): @@ -273,6 +298,7 @@ class GCPTestSequenceNodejs( metaclass=TestSequenceMeta, benchmarks=benchmarks_nodejs, architectures=architectures_gcp, + deployments=deployments_gcp, deployment_name="gcp", triggers=[Trigger.TriggerType.HTTP], ): @@ -296,6 +322,7 @@ class OpenWhiskTestSequencePython( metaclass=TestSequenceMeta, benchmarks=benchmarks_python, architectures=architectures_openwhisk, + deployments=deployments_openwhisk, deployment_name="openwhisk", triggers=[Trigger.TriggerType.HTTP], ): @@ -319,6 +346,7 @@ class OpenWhiskTestSequenceNodejs( metaclass=TestSequenceMeta, benchmarks=benchmarks_nodejs, architectures=architectures_openwhisk, + deployments=deployments_openwhisk, deployment_name="openwhisk", triggers=[Trigger.TriggerType.HTTP], ): @@ -351,8 +379,8 @@ def __init__(self): def status(self, *args, **kwargs): self.all_correct = self.all_correct and (kwargs["test_status"] in ["inprogress", "success"]) - bench, arch = kwargs["test_id"].split("_")[-2:None] - test_name = f"{bench}, {arch}" + bench, arch, deployment_type = kwargs["test_id"].split("_")[-3:None] + test_name = f"{bench}, {arch}, {deployment_type}" if not kwargs["test_status"]: test_id = kwargs["test_id"] if test_id not in self.output: diff --git a/sebs/sebs.py b/sebs/sebs.py index fb732f45..ed3fc580 100644 --- a/sebs/sebs.py +++ b/sebs/sebs.py @@ -60,8 +60,8 @@ def __init__( logging_filename: Optional[str] = None, ): super().__init__() - self._cache_client = Cache(cache_dir) self._docker_client = docker.from_env() + self._cache_client = Cache(cache_dir, self._docker_client) self._config = SeBSConfig() self._output_dir = output_dir self._verbose = verbose @@ -116,10 +116,21 @@ def get_deployment( ) ) + if config["experiments"][ + "container_deployment" + ] and not self._config.supported_container_deployment(name): + raise RuntimeError(f"Container deployment is not supported in {name}.") + + if not config["experiments"][ + "container_deployment" + ] and not self._config.supported_package_deployment(name): + raise RuntimeError(f"Code package deployment is not supported in {name}.") + # FIXME: future annotations, requires Python 3.7+ handlers = self.generate_logging_handlers(logging_filename) if not deployment_config: deployment_config = Config.deserialize(dep_config, self.cache_client, handlers) + deployment_client = implementations[name]( self._config, deployment_config, # type: ignore diff --git a/sebs/utils.py b/sebs/utils.py index 3b635020..a86ebb6a 100644 --- a/sebs/utils.py +++ b/sebs/utils.py @@ -54,7 +54,7 @@ def execute(cmd, shell=False, cwd=None): def update_nested_dict(cfg: dict, keys: List[str], value: Optional[str]): - if value: + if value is not None: # make sure parent keys exist for key in keys[:-1]: cfg = cfg.setdefault(key, {})