Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental SciML simulate endpoint #31

Merged
merged 5 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ The service is a light wrapper service around [pyciemss](https://github.com/ciem
Both a FastAPI and RQ tasks are provided so jobs can be run asynchronously for long periods
of time. The service must also [conform to this spec](https://github.com/DARPA-ASKEM/simulation-api-spec).

Experimental: `sciml` engine can be chosen for `simulate`.

## Startup

To start the PyCIEMSS Simulation API, first run:
Expand All @@ -21,8 +23,8 @@ to start the containers and the API. The API url will be `http://localhost:8010`
### Result Files
Every operation saves 3 files to S3
- `result.csv`
- `eval.csv`
- `visualization.json`
- `eval.csv` (if pyciemss engine is used)
- `visualization.json` (if pyciemss engine is used)

### RabbitMQ
Only the `calibrate` operation reports progress to RabbitMQ. This is to
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.api
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ ENV REDIS_HOST redis
ENV REDIS_PORT 6379

WORKDIR /service
CMD uvicorn server:app --reload --host 0.0.0.0 --port 8000
CMD uvicorn api:app --reload --host 0.0.0.0 --port 8000
9 changes: 8 additions & 1 deletion docker/Dockerfile.worker
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
FROM python:3.10

RUN wget --no-verbose -O julia.tar.gz "https://julialang-s3.julialang.org/bin/linux/$(uname -m|sed 's/86_//')/1.9/julia-1.9.2-linux-$(uname -m).tar.gz"
RUN tar -xzf "julia.tar.gz" && mv julia-1.9.2 /opt/julia && \
ln -s /opt/julia/bin/julia /usr/local/bin/julia && rm "julia.tar.gz"

WORKDIR /

RUN julia -e 'using Pkg; Pkg.add(url="https://github.com/jataware/SciMLIntegration.jl", rev="main"); Pkg.precompile()'

RUN pip install --no-cache-dir poetry==1.5.1
COPY pyproject.toml pyproject.toml
COPY poetry.lock poetry.lock
RUN poetry config virtualenvs.create false && \
poetry install --no-root --no-cache --extras worker

RUN pip install --no-cache-dir git+https://github.com/fivegrant/pyciemss.git@fg/remove-pika
RUN pip install --no-cache-dir git+https://github.com/ciemss/pyciemss.git
COPY service service
COPY README.md README.md

Expand Down
61 changes: 52 additions & 9 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ boto3 = "^1.26.147"
uvicorn = "^0.22.0"
pika = "1.3.2"
pandas = "^2.0.0"
filelock = "^3.12.2"
juliacall = { version="^0.9.14", optional = true }
torch = { version = "^2.0.1", optional = true }
numpy = { version = "^1.24.0", optional = true }
filelock = "^3.12.2"

[tool.poetry.extras]
worker = ["torch", "numpy"]
worker = ["torch", "numpy", "juliacall"]

[tool.poetry.scripts]
mockrabbitmq = "service.utils.rabbitmq:mock_rabbitmq_consumer"
Expand Down
8 changes: 4 additions & 4 deletions service/server.py → service/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def simulate_model(body: SimulatePostRequest) -> JobResponse:
"""
Perform a simulation
"""
resp = create_job("operations.simulate", body, "simulate")
resp = create_job("simulate", body, "simulate")
response = {"simulation_id": resp["id"]}
return response

Expand All @@ -100,7 +100,7 @@ def calibrate_model(body: CalibratePostRequest) -> JobResponse:
"""
Calibrate a model
"""
resp = create_job("operations.calibrate_then_simulate", body, "calibrate")
resp = create_job("calibrate_then_simulate", body, "calibrate")
response = {"simulation_id": resp["id"]}
return response

Expand All @@ -110,7 +110,7 @@ def create_simulate_ensemble(body: EnsembleSimulatePostRequest) -> JobResponse:
"""
Perform ensemble simulate
"""
resp = create_job("operations.ensemble_simulate", body, "ensemble-simulate")
resp = create_job("ensemble_simulate", body, "ensemble-simulate")
response = {"simulation_id": resp["id"]}
return response

Expand All @@ -120,6 +120,6 @@ def create_calibrate_ensemble(body: EnsembleCalibratePostRequest) -> JobResponse
"""
Perform ensemble simulate
"""
resp = create_job("operations.ensemble_calibrate", body, "ensemble-calibrate")
resp = create_job("ensemble_calibrate", body, "ensemble-calibrate")
response = {"simulation_id": resp["id"]}
return response
Empty file added service/operations/__init__.py
Empty file.
1 change: 1 addition & 0 deletions service/operations.py → service/operations/ciemss.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)


@update_status_on_job_fail
def simulate(request, *, job_id):
logging.debug(f"{job_id} (username - {request.username}): start simulate")
Expand Down
43 changes: 43 additions & 0 deletions service/operations/sciml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import io
import json
import os
import urllib
import sys
import logging
import pandas as pd
import numpy as np
from juliacall import newmodule
import requests
from settings import settings
from utils.rq_helpers import update_status_on_job_fail
from utils.tds import update_tds_status, cleanup_job_dir, fetch_dataset, fetch_model, attach_files
from utils.rabbitmq import gen_rabbitmq_hook

TDS_CONFIGURATIONS = "/model_configurations/"
TDS_SIMULATIONS = "/simulations/"
TDS_URL = settings.TDS_URL

logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)

jl = newmodule("SciMLIntegration")
jl.seval("using SciMLIntegration, PythonCall")

@update_status_on_job_fail
def simulate(request, *, job_id):
logging.debug(f"{job_id} (username - {request.username}): start simulate")

sim_results_url = TDS_URL + TDS_SIMULATIONS + job_id

update_tds_status(sim_results_url, status="running", start=True)

# Get model from TDS
amr_path = fetch_model(request.model_config_id, TDS_URL, TDS_CONFIGURATIONS, job_id)
with open(amr_path, "r") as file:
amr = file.read()

result = jl.pytable(jl.simulate(amr, request.timespan.start, request.timespan.end))

attach_files({"data": result}, TDS_URL, TDS_SIMULATIONS, job_id)
cleanup_job_dir(job_id)
logging.debug(f"{job_id} (username - {request.username}): finish simulate")
9 changes: 5 additions & 4 deletions service/utils/rq_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@
settings.REDIS_HOST,
settings.REDIS_PORT
)
q = Queue(connection=redis, default_timeout=-1)
queue = Queue(connection=redis, default_timeout=-1)


def create_job(operation_name: str, request_payload, sim_type: str):
random_id = str(uuid.uuid4())
job_id = f"ciemss-{random_id}"
job = q.fetch_job(job_id)
job = queue.fetch_job(job_id)
operation = f"operations.{request_payload.engine}.{operation_name}"

post_url = TDS_URL + TDS_SIMULATIONS #+ job_id
payload = {
Expand All @@ -53,7 +54,7 @@ def create_job(operation_name: str, request_payload, sim_type: str):
"result_files": [],
"type": sim_type,
"status": "queued",
"engine": "ciemss",
"engine": request_payload.engine,
"workflow_id": job_id,
}
logging.info(payload)
Expand All @@ -63,7 +64,7 @@ def create_job(operation_name: str, request_payload, sim_type: str):
raise Exception(f"Failed to create simulation on TDS (status: {response.status_code}): {json.dumps(payload)}")
logging.info(response.content)

job = q.enqueue_call(func=operation_name, args=[request_payload], kwargs={"job_id": job_id}, job_id=job_id)
job = queue.enqueue_call(func=operation, args=[request_payload], kwargs={"job_id": job_id}, job_id=job_id)

status = job.get_status()
if status in ("finished", "failed"):
Expand Down
25 changes: 17 additions & 8 deletions service/utils/tds.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,27 @@ def fetch_dataset(dataset: dict, tds_api, job_id):
def attach_files(output: dict, tds_api, simulation_endpoint, job_id, status='complete'):
sim_results_url = tds_api + simulation_endpoint + job_id
job_dir = get_job_dir(job_id)
files = {}

output_filename = os.path.join(job_dir, "./result.csv")
data_result = output.get('data', None)
if data_result is not None:
data_result.to_csv(output_filename, index=False)
files[output_filename] = "result.csv"

eval_output_filename = os.path.join(job_dir, "./eval.csv")
eval_result = output.get('quantiles', None)
if eval_result is not None:
eval_result.to_csv(eval_output_filename, index=False)
files[eval_output_filename] = "eval.csv"

visualization_filename = os.path.join(job_dir, "./visualization.json")
samples = output.get('data')
schema = output.get('visual')
with open(visualization_filename, "w") as f:
json.dump(schema, f, indent=2)
samples.to_csv(output_filename, index=False)
eval = output.get('quantiles')
eval.to_csv(eval_output_filename, index=False)
files = {output_filename: "result.csv", visualization_filename: "visualization.json", eval_output_filename: "eval.csv"}
viz_result = output.get('visual', None)
if viz_result is not None:
with open(visualization_filename, "w") as f:
json.dump(viz_result, f, indent=2)
files[visualization_filename] = "visualization.json"


if status!="error":
for (location, handle) in files.items():
Expand Down