diff --git a/README.md b/README.md index 72ddaee..cbdf6e1 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ The service is a light wrapper service around [pyciemss](https://github.com/ciem Both a FastAPI and RQ tasks are provided so jobs can be run asynchronously for long periods of time. The service must also [conform to this spec](https://github.com/DARPA-ASKEM/simulation-api-spec). +Experimental: `sciml` engine can be chosen for `simulate`. + ## Startup To start the PyCIEMSS Simulation API, first run: @@ -21,8 +23,8 @@ to start the containers and the API. The API url will be `http://localhost:8010` ### Result Files Every operation saves 3 files to S3 - `result.csv` -- `eval.csv` -- `visualization.json` +- `eval.csv` (if pyciemss engine is used) +- `visualization.json` (if pyciemss engine is used) ### RabbitMQ Only the `calibrate` operation reports progress to RabbitMQ. This is to diff --git a/docker/Dockerfile.api b/docker/Dockerfile.api index 9d9eb1c..fac0e6e 100644 --- a/docker/Dockerfile.api +++ b/docker/Dockerfile.api @@ -16,4 +16,4 @@ ENV REDIS_HOST redis ENV REDIS_PORT 6379 WORKDIR /service -CMD uvicorn server:app --reload --host 0.0.0.0 --port 8000 +CMD uvicorn api:app --reload --host 0.0.0.0 --port 8000 diff --git a/docker/Dockerfile.worker b/docker/Dockerfile.worker index be74809..024ebd1 100644 --- a/docker/Dockerfile.worker +++ b/docker/Dockerfile.worker @@ -1,13 +1,20 @@ FROM python:3.10 +RUN wget --no-verbose -O julia.tar.gz "https://julialang-s3.julialang.org/bin/linux/$(uname -m|sed 's/86_//')/1.9/julia-1.9.2-linux-$(uname -m).tar.gz" +RUN tar -xzf "julia.tar.gz" && mv julia-1.9.2 /opt/julia && \ + ln -s /opt/julia/bin/julia /usr/local/bin/julia && rm "julia.tar.gz" + WORKDIR / + +RUN julia -e 'using Pkg; Pkg.add(url="https://github.com/jataware/SciMLIntegration.jl", rev="main"); Pkg.precompile()' + RUN pip install --no-cache-dir poetry==1.5.1 COPY pyproject.toml pyproject.toml COPY poetry.lock poetry.lock RUN poetry config virtualenvs.create false && \ poetry install --no-root --no-cache --extras worker -RUN pip install --no-cache-dir git+https://github.com/fivegrant/pyciemss.git@fg/remove-pika +RUN pip install --no-cache-dir git+https://github.com/ciemss/pyciemss.git COPY service service COPY README.md README.md diff --git a/poetry.lock b/poetry.lock index 24349df..9862d20 100644 --- a/poetry.lock +++ b/poetry.lock @@ -34,17 +34,17 @@ files = [ [[package]] name = "boto3" -version = "1.28.15" +version = "1.28.16" description = "The AWS SDK for Python" optional = false python-versions = ">= 3.7" files = [ - {file = "boto3-1.28.15-py3-none-any.whl", hash = "sha256:84b7952858e9319968b0348d9894a91a6bb5f31e81a45c68044d040a12362abe"}, - {file = "boto3-1.28.15.tar.gz", hash = "sha256:a6e711e0b6960c3a5b789bd30c5a18eea7263f2a59fc07f85efa5e04804e49d2"}, + {file = "boto3-1.28.16-py3-none-any.whl", hash = "sha256:d8e31f69fb919025a5961f8fbeb51fe92e2f753beb37fc1853138667a231cdaa"}, + {file = "boto3-1.28.16.tar.gz", hash = "sha256:aea48aedf3e8676e598e3202e732295064a4fcad5f2d2d2a699368b8c3ab492c"}, ] [package.dependencies] -botocore = ">=1.31.15,<1.32.0" +botocore = ">=1.31.16,<1.32.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.6.0,<0.7.0" @@ -53,13 +53,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.31.15" +version = "1.31.16" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">= 3.7" files = [ - {file = "botocore-1.31.15-py3-none-any.whl", hash = "sha256:b3a0f787f275711875476cbe12a0123b2e6570b2f505e2fa509dcec3c5410b57"}, - {file = "botocore-1.31.15.tar.gz", hash = "sha256:b46d1ce4e0cf42d28fdf61ce0c999904645d38b51cb809817a361c0cec16d487"}, + {file = "botocore-1.31.16-py3-none-any.whl", hash = "sha256:92b240e2cb7b3afae5361651d2f48ee582f45d2dab53aef76eef7eec1d3ce582"}, + {file = "botocore-1.31.16.tar.gz", hash = "sha256:563e15979e763b93d78de58d0fc065f8615be12f41bab42f5ad9f412b6a224b3"}, ] [package.dependencies] @@ -290,6 +290,34 @@ files = [ {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"}, ] +[[package]] +name = "juliacall" +version = "0.9.14" +description = "Julia and Python in seamless harmony" +optional = true +python-versions = "~=3.7" +files = [ + {file = "juliacall-0.9.14-py3-none-any.whl", hash = "sha256:663339bfca454fe02aa8024f918db4e24007bb4d583cad371ace78baff39c4b3"}, + {file = "juliacall-0.9.14.tar.gz", hash = "sha256:65aa489280ac7e92f4e309cccec1994bdcffbb37f423d1b351f3f52bb35d5514"}, +] + +[package.dependencies] +juliapkg = ">=0.1.8,<0.2.0" + +[[package]] +name = "juliapkg" +version = "0.1.10" +description = "Julia version manager and package manager" +optional = true +python-versions = "*" +files = [ + {file = "juliapkg-0.1.10-py3-none-any.whl", hash = "sha256:716a4e665bd3c9cc9321d45712d60ba624c50d64ec73b04a7f0ee962649c8f1b"}, + {file = "juliapkg-0.1.10.tar.gz", hash = "sha256:70507318d51ac8663e856f56048764e49f5a0c4c90d81a3712d039a316369505"}, +] + +[package.dependencies] +semantic-version = ">=2.9,<3.0" + [[package]] name = "markupsafe" version = "2.1.3" @@ -649,6 +677,21 @@ botocore = ">=1.12.36,<2.0a.0" [package.extras] crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"] +[[package]] +name = "semantic-version" +version = "2.10.0" +description = "A library implementing the 'SemVer' scheme." +optional = true +python-versions = ">=2.7" +files = [ + {file = "semantic_version-2.10.0-py2.py3-none-any.whl", hash = "sha256:de78a3b8e0feda74cabc54aab2da702113e33ac9d9eb9d2389bcf1f58b7d9177"}, + {file = "semantic_version-2.10.0.tar.gz", hash = "sha256:bdabb6d336998cbb378d4b9db3a4b56a1e3235701dc05ea2690d9a997ed5041c"}, +] + +[package.extras] +dev = ["Django (>=1.11)", "check-manifest", "colorama (<=0.4.1)", "coverage", "flake8", "nose2", "readme-renderer (<25.0)", "tox", "wheel", "zest.releaser[recommended]"] +doc = ["Sphinx", "sphinx-rtd-theme"] + [[package]] name = "six" version = "1.16.0" @@ -799,9 +842,9 @@ h11 = ">=0.8" standard = ["colorama (>=0.4)", "httptools (>=0.5.0)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1)", "watchfiles (>=0.13)", "websockets (>=10.4)"] [extras] -worker = ["numpy", "torch"] +worker = ["juliacall", "numpy", "torch"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "206fdfe1f074b9f815006821d223ffda6f29b7be5923d5dcc4010fd9e4f96f26" +content-hash = "87fad367f66cd06d28816f742c0cda9b8a61540283523e3a0af90e3e75f9d231" diff --git a/pyproject.toml b/pyproject.toml index bf084db..924bc01 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,12 +16,13 @@ boto3 = "^1.26.147" uvicorn = "^0.22.0" pika = "1.3.2" pandas = "^2.0.0" +filelock = "^3.12.2" +juliacall = { version="^0.9.14", optional = true } torch = { version = "^2.0.1", optional = true } numpy = { version = "^1.24.0", optional = true } -filelock = "^3.12.2" [tool.poetry.extras] -worker = ["torch", "numpy"] +worker = ["torch", "numpy", "juliacall"] [tool.poetry.scripts] mockrabbitmq = "service.utils.rabbitmq:mock_rabbitmq_consumer" diff --git a/service/server.py b/service/api.py similarity index 90% rename from service/server.py rename to service/api.py index 559a194..a6753bd 100644 --- a/service/server.py +++ b/service/api.py @@ -90,7 +90,7 @@ def simulate_model(body: SimulatePostRequest) -> JobResponse: """ Perform a simulation """ - resp = create_job("operations.simulate", body, "simulate") + resp = create_job("simulate", body, "simulate") response = {"simulation_id": resp["id"]} return response @@ -100,7 +100,7 @@ def calibrate_model(body: CalibratePostRequest) -> JobResponse: """ Calibrate a model """ - resp = create_job("operations.calibrate_then_simulate", body, "calibrate") + resp = create_job("calibrate_then_simulate", body, "calibrate") response = {"simulation_id": resp["id"]} return response @@ -110,7 +110,7 @@ def create_simulate_ensemble(body: EnsembleSimulatePostRequest) -> JobResponse: """ Perform ensemble simulate """ - resp = create_job("operations.ensemble_simulate", body, "ensemble-simulate") + resp = create_job("ensemble_simulate", body, "ensemble-simulate") response = {"simulation_id": resp["id"]} return response @@ -120,6 +120,6 @@ def create_calibrate_ensemble(body: EnsembleCalibratePostRequest) -> JobResponse """ Perform ensemble simulate """ - resp = create_job("operations.ensemble_calibrate", body, "ensemble-calibrate") + resp = create_job("ensemble_calibrate", body, "ensemble-calibrate") response = {"simulation_id": resp["id"]} return response diff --git a/service/operations/__init__.py b/service/operations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/service/operations.py b/service/operations/ciemss.py similarity index 99% rename from service/operations.py rename to service/operations/ciemss.py index 1f4b593..19c6709 100644 --- a/service/operations.py +++ b/service/operations/ciemss.py @@ -28,6 +28,7 @@ logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) + @update_status_on_job_fail def simulate(request, *, job_id): logging.debug(f"{job_id} (username - {request.username}): start simulate") diff --git a/service/operations/sciml.py b/service/operations/sciml.py new file mode 100644 index 0000000..ed595e7 --- /dev/null +++ b/service/operations/sciml.py @@ -0,0 +1,43 @@ +import io +import json +import os +import urllib +import sys +import logging +import pandas as pd +import numpy as np +from juliacall import newmodule +import requests +from settings import settings +from utils.rq_helpers import update_status_on_job_fail +from utils.tds import update_tds_status, cleanup_job_dir, fetch_dataset, fetch_model, attach_files +from utils.rabbitmq import gen_rabbitmq_hook + +TDS_CONFIGURATIONS = "/model_configurations/" +TDS_SIMULATIONS = "/simulations/" +TDS_URL = settings.TDS_URL + +logging.basicConfig() +logging.getLogger().setLevel(logging.DEBUG) + +jl = newmodule("SciMLIntegration") +jl.seval("using SciMLIntegration, PythonCall") + +@update_status_on_job_fail +def simulate(request, *, job_id): + logging.debug(f"{job_id} (username - {request.username}): start simulate") + + sim_results_url = TDS_URL + TDS_SIMULATIONS + job_id + + update_tds_status(sim_results_url, status="running", start=True) + + # Get model from TDS + amr_path = fetch_model(request.model_config_id, TDS_URL, TDS_CONFIGURATIONS, job_id) + with open(amr_path, "r") as file: + amr = file.read() + + result = jl.pytable(jl.simulate(amr, request.timespan.start, request.timespan.end)) + + attach_files({"data": result}, TDS_URL, TDS_SIMULATIONS, job_id) + cleanup_job_dir(job_id) + logging.debug(f"{job_id} (username - {request.username}): finish simulate") diff --git a/service/utils/rq_helpers.py b/service/utils/rq_helpers.py index a0b3c36..4483cc0 100644 --- a/service/utils/rq_helpers.py +++ b/service/utils/rq_helpers.py @@ -38,13 +38,14 @@ settings.REDIS_HOST, settings.REDIS_PORT ) -q = Queue(connection=redis, default_timeout=-1) +queue = Queue(connection=redis, default_timeout=-1) def create_job(operation_name: str, request_payload, sim_type: str): random_id = str(uuid.uuid4()) job_id = f"ciemss-{random_id}" - job = q.fetch_job(job_id) + job = queue.fetch_job(job_id) + operation = f"operations.{request_payload.engine}.{operation_name}" post_url = TDS_URL + TDS_SIMULATIONS #+ job_id payload = { @@ -53,7 +54,7 @@ def create_job(operation_name: str, request_payload, sim_type: str): "result_files": [], "type": sim_type, "status": "queued", - "engine": "ciemss", + "engine": request_payload.engine, "workflow_id": job_id, } logging.info(payload) @@ -63,7 +64,7 @@ def create_job(operation_name: str, request_payload, sim_type: str): raise Exception(f"Failed to create simulation on TDS (status: {response.status_code}): {json.dumps(payload)}") logging.info(response.content) - job = q.enqueue_call(func=operation_name, args=[request_payload], kwargs={"job_id": job_id}, job_id=job_id) + job = queue.enqueue_call(func=operation, args=[request_payload], kwargs={"job_id": job_id}, job_id=job_id) status = job.get_status() if status in ("finished", "failed"): diff --git a/service/utils/tds.py b/service/utils/tds.py index 5d59691..9939116 100644 --- a/service/utils/tds.py +++ b/service/utils/tds.py @@ -95,18 +95,27 @@ def fetch_dataset(dataset: dict, tds_api, job_id): def attach_files(output: dict, tds_api, simulation_endpoint, job_id, status='complete'): sim_results_url = tds_api + simulation_endpoint + job_id job_dir = get_job_dir(job_id) + files = {} output_filename = os.path.join(job_dir, "./result.csv") + data_result = output.get('data', None) + if data_result is not None: + data_result.to_csv(output_filename, index=False) + files[output_filename] = "result.csv" + eval_output_filename = os.path.join(job_dir, "./eval.csv") + eval_result = output.get('quantiles', None) + if eval_result is not None: + eval_result.to_csv(eval_output_filename, index=False) + files[eval_output_filename] = "eval.csv" + visualization_filename = os.path.join(job_dir, "./visualization.json") - samples = output.get('data') - schema = output.get('visual') - with open(visualization_filename, "w") as f: - json.dump(schema, f, indent=2) - samples.to_csv(output_filename, index=False) - eval = output.get('quantiles') - eval.to_csv(eval_output_filename, index=False) - files = {output_filename: "result.csv", visualization_filename: "visualization.json", eval_output_filename: "eval.csv"} + viz_result = output.get('visual', None) + if viz_result is not None: + with open(visualization_filename, "w") as f: + json.dump(viz_result, f, indent=2) + files[visualization_filename] = "visualization.json" + if status!="error": for (location, handle) in files.items():