Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

terarium dataservice migration changes #51

Merged
merged 28 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e131b22
Update to new dataservice endpoints, added basic auth vars. Some cleanup
kbirk Jan 8, 2024
5b71a93
Id is not server generated
kbirk Jan 8, 2024
5a1971a
fix lint errors
dgauldie Jan 8, 2024
136d48f
fix formatting errors
dgauldie Jan 8, 2024
e44eb3e
fix formatting errors
dgauldie Jan 8, 2024
0e6ea0a
fixed broken unit tests
dgauldie Jan 9, 2024
d65060a
Some fixes
kbirk Jan 9, 2024
3450b88
Merge remote-tracking branch 'origin/main' into dataservice-update
dgauldie Jan 10, 2024
838367d
fix issues after merge
dgauldie Jan 11, 2024
abdf4ce
fixed endpoint
dgauldie Jan 11, 2024
76dd404
removed unused consts
dgauldie Jan 11, 2024
33381bf
Bump minimum Python version to 3.10
fivegrant Jan 11, 2024
76e68ee
Update test input / output
kbirk Jan 11, 2024
afccfc2
Merge branch 'dataservice-update' of github.com:DARPA-ASKEM/pyciemss-…
kbirk Jan 11, 2024
ab5c872
fixed unit tests
dgauldie Jan 11, 2024
34e3d8e
Merge remote-tracking branch 'origin/dataservice-update' into dataser…
dgauldie Jan 11, 2024
890f986
fix formatting
dgauldie Jan 11, 2024
1923de8
More fixes
kbirk Jan 11, 2024
c9c07fd
fix formatting
dgauldie Jan 11, 2024
1fdcc79
fix paths
mwdchang Jan 24, 2024
131366d
attempt to fix tests
mwdchang Jan 24, 2024
62dd30e
uuid
mwdchang Jan 29, 2024
a7d1725
model-configuration url
mwdchang Jan 29, 2024
294473c
use request to fetch presigned-url
mwdchang Jan 30, 2024
c504df4
revert csv => url
mwdchang Jan 31, 2024
41867a4
dataframe wrangling, tds.session => requests
mwdchang Feb 1, 2024
80f487a
fix test
mwdchang Feb 1, 2024
8976f8b
fix test
mwdchang Feb 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ indent_style = space
[*.{yml,yaml}]
indent_style = space

[*.{py}]
indent_size = 4
indent_style = space

[*.{cmd,bat}]
end_of_line = crlf

Expand Down
26 changes: 3 additions & 23 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ authors = ["Powell Fendley", "Five Grant"]
readme = "README.md"
packages = [{include = "service"}, {include = "tests"}]


[tool.poetry.dependencies]
python = "^3.9"
python = "^3.10"
requests = "^2.31.0"
fastapi = "^0.96.0"
rq = "^1.15.0"
Expand All @@ -21,6 +22,7 @@ poethepoet = "^0.21.1"
# juliacall = { version="^0.9.14", optional = true }
dill = "^0.3.7"


[tool.poetry.scripts]
mockrabbitmq = "service.utils.rabbitmq:mock_rabbitmq_consumer"

Expand All @@ -35,6 +37,7 @@ mock = "^5.1.0"
fakeredis = "^2.17.0"
httpx = "^0.24.1"


[tool.poe.tasks]
install-pyciemss = "pip install --no-cache-dir git+https://github.com/fivegrant/pyciemss.git@087bc64d935f2ab5090330f1f7d6bde930404115 --use-pep517"

Expand All @@ -47,3 +50,7 @@ pythonpath = "service"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"


[tool.ruff]
ignore = ["E501"]
13 changes: 4 additions & 9 deletions service/execute.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging

# from juliacall import newmodule
from settings import settings
from utils.tds import (
update_tds_status,
cleanup_job_dir,
Expand All @@ -10,9 +9,6 @@

from pyciemss.interfaces import sample, calibrate, ensemble_sample # noqa: F401

TDS_SIMULATIONS = "/simulations/"
TDS_URL = settings.TDS_URL

# jl = newmodule("SciMLIntegration")
# jl.seval("using SciMLIntegration, PythonCall")

Expand All @@ -21,9 +17,8 @@


def run(request, *, job_id):
logging.debug(f"STARTED {job_id} (username: {request.username})")
sim_results_url = TDS_URL + TDS_SIMULATIONS + job_id
update_tds_status(sim_results_url, status="running", start=True)
logging.debug(f"STARTED {job_id} (user_id: {request.user_id})")
update_tds_status(job_id, status="running", start=True)

# if request.engine == "ciemss":
operation_name = request.__class__.pyciemss_lib_function
Expand All @@ -36,6 +31,6 @@ def run(request, *, job_id):
# operation = request.__class__.sciml_lib_function
# output = operation(job_id, jl)

attach_files(output, TDS_URL, TDS_SIMULATIONS, job_id)
attach_files(output, job_id)
cleanup_job_dir(job_id)
logging.debug(f"FINISHED {job_id} (username: {request.username})")
logging.debug(f"FINISHED {job_id} (user_id: {request.user_id})")
34 changes: 10 additions & 24 deletions service/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
from utils.convert import convert_to_static_interventions, convert_to_solution_mapping
from utils.rabbitmq import gen_rabbitmq_hook # noqa: F401
from utils.tds import fetch_dataset, fetch_model, fetch_inferred_parameters
from settings import settings

TDS_CONFIGURATIONS = "/model_configurations/"
TDS_SIMULATIONS = "/simulations/"
TDS_URL = settings.TDS_URL


class Timespan(BaseModel):
Expand Down Expand Up @@ -95,7 +90,7 @@ class QuantityOfInterest(BaseModel):
class OperationRequest(BaseModel):
pyciemss_lib_function: ClassVar[str] = ""
engine: str = Field("ciemss", example="ciemss")
username: str = Field("not_provided", example="not_provided")
user_id: str = Field("not_provided", example="not_provided")

def gen_pyciemss_args(self, job_id):
raise NotImplementedError("PyCIEMSS cannot handle this operation")
Expand Down Expand Up @@ -137,15 +132,13 @@ class Simulate(OperationRequest):

def gen_pyciemss_args(self, job_id):
# Get model from TDS
amr_path = fetch_model(
self.model_config_id, TDS_URL, TDS_CONFIGURATIONS, job_id
)
amr_path = fetch_model(self.model_config_id, job_id)

interventions = convert_to_static_interventions(self.interventions)

extra_options = self.extra.dict()
inferred_parameters = fetch_inferred_parameters(
extra_options.pop("inferred_parameters"), TDS_URL, job_id
extra_options.pop("inferred_parameters"), job_id
)

return {
Expand All @@ -159,9 +152,7 @@ def gen_pyciemss_args(self, job_id):
}

def run_sciml_operation(self, job_id, julia_context):
amr_path = fetch_model(
self.model_config_id, TDS_URL, TDS_CONFIGURATIONS, job_id
)
amr_path = fetch_model(self.model_config_id, job_id)
with open(amr_path, "r") as file:
amr = file.read()
result = julia_context.simulate(amr, self.timespan.start, self.timespan.end)
Expand Down Expand Up @@ -209,11 +200,9 @@ class Calibrate(OperationRequest):
)

def gen_pyciemss_args(self, job_id):
amr_path = fetch_model(
self.model_config_id, TDS_URL, TDS_CONFIGURATIONS, job_id
)
amr_path = fetch_model(self.model_config_id, job_id)

dataset_path = fetch_dataset(self.dataset.dict(), TDS_URL, job_id)
dataset_path = fetch_dataset(self.dataset.dict(), job_id)

# TODO: Test RabbitMQ
try:
Expand Down Expand Up @@ -271,10 +260,7 @@ def gen_pyciemss_args(self, job_id):
solution_mappings = [
convert_to_solution_mapping(config) for config in self.model_configs
]
amr_paths = [
fetch_model(config.id, TDS_URL, TDS_CONFIGURATIONS, job_id)
for config in self.model_configs
]
amr_paths = [fetch_model(config.id, job_id) for config in self.model_configs]

return {
"model_paths_or_jsons": amr_paths,
Expand Down Expand Up @@ -310,7 +296,7 @@ class Config:
# pyciemss_lib_function: ClassVar[
# str
# ] = "load_and_calibrate_and_sample_ensemble_model"
# username: str = Field("not_provided", example="not_provided")
# user_id: str = Field("not_provided", example="not_provided")
# model_configs: List[ModelConfig] = Field(
# [],
# example=[],
Expand All @@ -327,11 +313,11 @@ class Config:
# solution_mappings = [config.solution_mappings for config
# in self.model_configs]
# amr_paths = [
# fetch_model(config.id, TDS_URL, TDS_CONFIGURATIONS, job_id)
# fetch_model(config.id, job_id)
# for config in self.model_configs
# ]

# dataset_path = fetch_dataset(self.dataset.dict(), TDS_URL, job_id)
# dataset_path = fetch_dataset(self.dataset.dict(), job_id)

# # Generate timepoints
# time_count = self.timespan.end - self.timespan.start
Expand Down
2 changes: 2 additions & 0 deletions service/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ class Settings(BaseSettings):
"""

TDS_URL: str = "http://data-service-api:8000"
TDS_USER: str = "user"
TDS_PASSWORD: str = "password"
REDIS_HOST: str = "redis"
REDIS_PORT: int = 6379
RABBITMQ_HOST: str = "rabbitmq.pyciemss"
Expand Down
41 changes: 14 additions & 27 deletions service/utils/rq_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

import logging
from uuid import uuid4
import json
import requests

from fastapi import Response, status
from redis import Redis
Expand All @@ -13,10 +11,7 @@
from rq.job import Job

from settings import settings
from utils.tds import update_tds_status

TDS_SIMULATIONS = "/simulations/"
TDS_URL = settings.TDS_URL
from utils.tds import update_tds_status, create_tds_job, cancel_tds_job

logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
Expand All @@ -27,42 +22,37 @@ def get_redis():


def update_status_on_job_fail(job, connection, etype, value, traceback):
update_tds_status(TDS_URL + TDS_SIMULATIONS + str(job.id), "error")
update_tds_status(str(job.id), "error")
log_message = f"""
###############################

There was an exception in CIEMSS Service

job: {job.id}
{etype}: {value}
{etype}: {value}
################################
"""
logging.exception(log_message)


def create_job(request_payload, sim_type, redis_conn):
job_id = f"ciemss-{uuid4()}"
workflow_id = f"{uuid4()}"

post_url = TDS_URL + TDS_SIMULATIONS
payload = {
"id": job_id,
"name": workflow_id,
"execution_payload": request_payload.dict(),
"result_files": [],
"type": sim_type,
"status": "queued",
"engine": request_payload.engine,
"workflow_id": job_id,
"workflow_id": workflow_id,
}
logging.info(payload)
response = requests.post(post_url, json=payload)
if response.status_code >= 300:
raise Exception(
(
"Failed to create simulation on TDS "
f"(status: {response.status_code}): {json.dumps(payload)}"
)
)
logging.info(response.content)

res = create_tds_job(payload)
job_id = res["id"]

logging.info(res)

queue = Queue(connection=redis_conn, default_timeout=-1)
queue.enqueue_call(
Expand All @@ -80,7 +70,7 @@ def fetch_job_status(job_id, redis_conn):
"""Fetch a job's results from RQ.

Args:
job_id (str): The id of the job being run in RQ.
job_id (uuid): The id of the job being run in RQ.

Returns:
Response:
Expand Down Expand Up @@ -111,10 +101,7 @@ def kill_job(job_id, redis_conn):
else:
job.cancel()

url = TDS_URL + TDS_SIMULATIONS + str(job_id)
tds_payload = requests.get(url).json()
tds_payload["status"] = "cancelled"
requests.put(url, json=json.loads(json.dumps(tds_payload, default=str)))
cancel_tds_job(str(job_id))

result = job.get_status()
return result
Loading
Loading