Skip to content

Commit

Permalink
Handles errors and update simulation status in TDS (#8)
Browse files Browse the repository at this point in the history
* handles errors from pyciems and our wrapper code

* adding vis

* working service with latest changes

* Fix status mapping

---------

Co-authored-by: Five Grant <5@fivegrant.com>
  • Loading branch information
marshHawk4 and fivegrant authored Jul 13, 2023
1 parent 6f6c4e4 commit 9d903f4
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 25 deletions.
16 changes: 12 additions & 4 deletions api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,26 @@ class Timespan(BaseModel):


class Status(Enum):
queued = "queued"
running = "running"
cancelled = "cancelled"
complete = "complete"
error = "error"
queued = "queued"
running = "running"
failed = "failed"
started = "started"
finished = "finished"

@staticmethod
def from_rq(rq_status):
rq_status_to_tds_status = {
"cancelled": "cancelled",
"complete": "complete",
"error": "error",
"queued": "queued",
"running": "running",
"failed": "failed",
"started": "running",
"finished": "complete",
"error": "error",
"finished": "complete"
}
return Status(rq_status_to_tds_status[rq_status])

Expand Down
10 changes: 6 additions & 4 deletions api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ def get_status(simulation_id: str) -> StatusSimulationIdGetResponse:
from utils import fetch_job_status

status = fetch_job_status(simulation_id)
print(status)
if not isinstance(status, str):
return status

return {"status": Status.from_rq(status)}


import logging
@app.post("/simulate", response_model=JobResponse)
def simulate_model(body: SimulatePostRequest) -> SimulatePostResponse:
def simulate_model(body: SimulatePostRequest) -> JobResponse:
"""
Perform a simulation
"""
from utils import create_job

# Parse request body
engine = str(body.engine.value).lower()
model_config_id = body.model_config_id
Expand All @@ -72,6 +72,7 @@ def simulate_model(body: SimulatePostRequest) -> SimulatePostResponse:
"start": start,
"end": end,
"extra": body.extra.dict(),
"visual_options": True
}

resp = create_job(operation_name=operation_name, options=options)
Expand All @@ -82,7 +83,7 @@ def simulate_model(body: SimulatePostRequest) -> SimulatePostResponse:


@app.post("/calibrate", response_model=JobResponse)
def calibrate_model(body: CalibratePostRequest) -> CalibratePostResponse:
def calibrate_model(body: CalibratePostRequest) -> JobResponse:
"""
Calibrate a model
"""
Expand All @@ -106,6 +107,7 @@ def calibrate_model(body: CalibratePostRequest) -> CalibratePostResponse:
"end": end,
"dataset": dataset.dict(),
"extra": extra,
"visual_options": True
}

resp = create_job(operation_name=operation_name, options=options)
Expand Down
27 changes: 18 additions & 9 deletions workers/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
import os
import urllib
import sys

import logging
import numpy as np
import requests
from utils import update_tds_status, parse_samples_into_csv, fetch_dataset, fetch_model, attach_files
from utils import update_tds_status,\
parse_samples_into_csv,\
fetch_dataset,\
fetch_model,\
attach_files,\
catch_job_status

from pyciemss.PetriNetODE.interfaces import (
load_and_calibrate_and_sample_petri_model,
Expand All @@ -18,7 +23,7 @@
OUTPUT_FILENAME = os.getenv("PYCIEMSS_OUTPUT_FILEPATH")
TDS_API = os.getenv("TDS_URL")


@catch_job_status
def simulate(*args, **kwargs):
model_config_id = kwargs.pop("model_config_id")
num_samples = kwargs.pop("num_samples")
Expand All @@ -35,15 +40,19 @@ def simulate(*args, **kwargs):

# Generate timepoints
time_count = end - start
timepoints = map(float, range(1, time_count + 1))
timepoints=[x for x in range(1,time_count+1)]

samples = load_and_sample_petri_model(amr_path, num_samples, timepoints=timepoints, **kwargs)
output = load_and_sample_petri_model(amr_path, num_samples, timepoints=timepoints, **kwargs)
samples = output.get('data')
schema = output.get('visual')
with open("visualization.json", "w") as f:
json.dump(schema, f, indent=2)
samples.to_csv(OUTPUT_FILENAME, index=False)
attach_files({OUTPUT_FILENAME: "result.csv"}, TDS_API, TDS_SIMULATIONS, job_id)

return True
attach_files({OUTPUT_FILENAME: "result.csv", "visualization.json": "visualization.json"}, TDS_API, TDS_SIMULATIONS, job_id)

return

@catch_job_status
def calibrate_then_simulate(*args, **kwargs):
model_config_id = kwargs.pop("model_config_id")
start = kwargs.pop("start")
Expand All @@ -59,7 +68,7 @@ def calibrate_then_simulate(*args, **kwargs):

# Generate timepoints
time_count = end - start
timepoints = map(float, range(1, time_count + 1))
timepoints=[x for x in range(1,time_count+1)]

dataset_path = fetch_dataset(kwargs.pop("dataset"), TDS_API)

Expand Down
59 changes: 51 additions & 8 deletions workers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from datetime import datetime
import pandas
import numpy as np
import time
import logging

TDS_SIMULATIONS = "/simulations/"
OUTPUT_FILENAME = os.getenv("PYCIEMSS_OUTPUT_FILEPATH")
TDS_API = os.getenv("TDS_URL")

def parse_samples_into_file(samples):
# samples_str = json.dumps(samples)
Expand Down Expand Up @@ -131,17 +136,55 @@ def fetch_dataset(dataset: dict, tds_api):
return dataset_path


def attach_files(files: dict, tds_api, simulation_endpoint, job_id):
def attach_files(files: dict, tds_api, simulation_endpoint, job_id, status='complete'):
sim_results_url = tds_api + simulation_endpoint + job_id
for (location, handle) in files.items():
upload_url = f"{sim_results_url}/upload-url?filename={handle}"
upload_response = requests.get(upload_url)
presigned_upload_url = upload_response.json()["url"]
with open(location, "rb") as f:
upload_response = requests.put(presigned_upload_url, f)

if status!="error":
for (location, handle) in files.items():
upload_url = f"{sim_results_url}/upload-url?filename={handle}"
upload_response = requests.get(upload_url)
presigned_upload_url = upload_response.json()["url"]
with open(location, "rb") as f:
upload_response = requests.put(presigned_upload_url, f)


# Update simulation object with status and filepaths.
update_tds_status(
sim_results_url, status="complete", result_files=list(files.values()), finish=True
sim_results_url, status=status, result_files=list(files.values()), finish=True
)


def catch_job_status( function):
"""
decorator that catches failed wrapped rq jobs and make sure the simulation status is set in tds.
"""
def wrapped(*args, **kwargs):
try:
start_time = time.perf_counter()
result = function(*args, **kwargs)
end_time = time.perf_counter()
logging.info(
f"Elapsed time for {function.__name__}:",
end_time - start_time
)
return result
except Exception as e:

attach_files({OUTPUT_FILENAME: "result.csv"},
TDS_API,
TDS_SIMULATIONS,
job_id=kwargs.get('job_id'),
status="error")

log_message = f"""
###############################
There was an exception in CIEMSS Service
Error occured in function: {function.__name__}
################################
"""
logging.exception(log_message)
raise e
return wrapped

0 comments on commit 9d903f4

Please sign in to comment.