From 9d903f415dc0740092c8b29c795e71d911e11d2c Mon Sep 17 00:00:00 2001 From: marshHawk4 Date: Thu, 13 Jul 2023 14:44:42 -0400 Subject: [PATCH] Handles errors and update simulation status in TDS (#8) * handles errors from pyciems and our wrapper code * adding vis * working service with latest changes * Fix status mapping --------- Co-authored-by: Five Grant <5@fivegrant.com> --- api/models.py | 16 +++++++++--- api/server.py | 10 +++++--- workers/operations.py | 27 +++++++++++++------- workers/utils.py | 59 +++++++++++++++++++++++++++++++++++++------ 4 files changed, 87 insertions(+), 25 deletions(-) diff --git a/api/models.py b/api/models.py index 07b7ce7..0c84a38 100644 --- a/api/models.py +++ b/api/models.py @@ -17,18 +17,26 @@ class Timespan(BaseModel): class Status(Enum): - queued = "queued" - running = "running" + cancelled = "cancelled" complete = "complete" error = "error" + queued = "queued" + running = "running" + failed = "failed" + started = "started" + finished = "finished" @staticmethod def from_rq(rq_status): rq_status_to_tds_status = { + "cancelled": "cancelled", + "complete": "complete", + "error": "error", "queued": "queued", + "running": "running", + "failed": "failed", "started": "running", - "finished": "complete", - "error": "error", + "finished": "complete" } return Status(rq_status_to_tds_status[rq_status]) diff --git a/api/server.py b/api/server.py index 3e53517..ce48c77 100644 --- a/api/server.py +++ b/api/server.py @@ -46,19 +46,19 @@ def get_status(simulation_id: str) -> StatusSimulationIdGetResponse: from utils import fetch_job_status status = fetch_job_status(simulation_id) + print(status) if not isinstance(status, str): return status return {"status": Status.from_rq(status)} - +import logging @app.post("/simulate", response_model=JobResponse) -def simulate_model(body: SimulatePostRequest) -> SimulatePostResponse: +def simulate_model(body: SimulatePostRequest) -> JobResponse: """ Perform a simulation """ from utils import create_job - # Parse request body engine = str(body.engine.value).lower() model_config_id = body.model_config_id @@ -72,6 +72,7 @@ def simulate_model(body: SimulatePostRequest) -> SimulatePostResponse: "start": start, "end": end, "extra": body.extra.dict(), + "visual_options": True } resp = create_job(operation_name=operation_name, options=options) @@ -82,7 +83,7 @@ def simulate_model(body: SimulatePostRequest) -> SimulatePostResponse: @app.post("/calibrate", response_model=JobResponse) -def calibrate_model(body: CalibratePostRequest) -> CalibratePostResponse: +def calibrate_model(body: CalibratePostRequest) -> JobResponse: """ Calibrate a model """ @@ -106,6 +107,7 @@ def calibrate_model(body: CalibratePostRequest) -> CalibratePostResponse: "end": end, "dataset": dataset.dict(), "extra": extra, + "visual_options": True } resp = create_job(operation_name=operation_name, options=options) diff --git a/workers/operations.py b/workers/operations.py index 55c22b6..121b4f5 100644 --- a/workers/operations.py +++ b/workers/operations.py @@ -3,10 +3,15 @@ import os import urllib import sys - +import logging import numpy as np import requests -from utils import update_tds_status, parse_samples_into_csv, fetch_dataset, fetch_model, attach_files +from utils import update_tds_status,\ + parse_samples_into_csv,\ + fetch_dataset,\ + fetch_model,\ + attach_files,\ + catch_job_status from pyciemss.PetriNetODE.interfaces import ( load_and_calibrate_and_sample_petri_model, @@ -18,7 +23,7 @@ OUTPUT_FILENAME = os.getenv("PYCIEMSS_OUTPUT_FILEPATH") TDS_API = os.getenv("TDS_URL") - +@catch_job_status def simulate(*args, **kwargs): model_config_id = kwargs.pop("model_config_id") num_samples = kwargs.pop("num_samples") @@ -35,15 +40,19 @@ def simulate(*args, **kwargs): # Generate timepoints time_count = end - start - timepoints = map(float, range(1, time_count + 1)) + timepoints=[x for x in range(1,time_count+1)] - samples = load_and_sample_petri_model(amr_path, num_samples, timepoints=timepoints, **kwargs) + output = load_and_sample_petri_model(amr_path, num_samples, timepoints=timepoints, **kwargs) + samples = output.get('data') + schema = output.get('visual') + with open("visualization.json", "w") as f: + json.dump(schema, f, indent=2) samples.to_csv(OUTPUT_FILENAME, index=False) - attach_files({OUTPUT_FILENAME: "result.csv"}, TDS_API, TDS_SIMULATIONS, job_id) - - return True + attach_files({OUTPUT_FILENAME: "result.csv", "visualization.json": "visualization.json"}, TDS_API, TDS_SIMULATIONS, job_id) + return +@catch_job_status def calibrate_then_simulate(*args, **kwargs): model_config_id = kwargs.pop("model_config_id") start = kwargs.pop("start") @@ -59,7 +68,7 @@ def calibrate_then_simulate(*args, **kwargs): # Generate timepoints time_count = end - start - timepoints = map(float, range(1, time_count + 1)) + timepoints=[x for x in range(1,time_count+1)] dataset_path = fetch_dataset(kwargs.pop("dataset"), TDS_API) diff --git a/workers/utils.py b/workers/utils.py index f6f2d60..2f4b24b 100644 --- a/workers/utils.py +++ b/workers/utils.py @@ -7,7 +7,12 @@ from datetime import datetime import pandas import numpy as np +import time +import logging +TDS_SIMULATIONS = "/simulations/" +OUTPUT_FILENAME = os.getenv("PYCIEMSS_OUTPUT_FILEPATH") +TDS_API = os.getenv("TDS_URL") def parse_samples_into_file(samples): # samples_str = json.dumps(samples) @@ -131,17 +136,55 @@ def fetch_dataset(dataset: dict, tds_api): return dataset_path -def attach_files(files: dict, tds_api, simulation_endpoint, job_id): +def attach_files(files: dict, tds_api, simulation_endpoint, job_id, status='complete'): sim_results_url = tds_api + simulation_endpoint + job_id - for (location, handle) in files.items(): - upload_url = f"{sim_results_url}/upload-url?filename={handle}" - upload_response = requests.get(upload_url) - presigned_upload_url = upload_response.json()["url"] - with open(location, "rb") as f: - upload_response = requests.put(presigned_upload_url, f) + + if status!="error": + for (location, handle) in files.items(): + upload_url = f"{sim_results_url}/upload-url?filename={handle}" + upload_response = requests.get(upload_url) + presigned_upload_url = upload_response.json()["url"] + with open(location, "rb") as f: + upload_response = requests.put(presigned_upload_url, f) # Update simulation object with status and filepaths. update_tds_status( - sim_results_url, status="complete", result_files=list(files.values()), finish=True + sim_results_url, status=status, result_files=list(files.values()), finish=True ) + + +def catch_job_status( function): + """ + decorator that catches failed wrapped rq jobs and make sure the simulation status is set in tds. + """ + def wrapped(*args, **kwargs): + try: + start_time = time.perf_counter() + result = function(*args, **kwargs) + end_time = time.perf_counter() + logging.info( + f"Elapsed time for {function.__name__}:", + end_time - start_time + ) + return result + except Exception as e: + + attach_files({OUTPUT_FILENAME: "result.csv"}, + TDS_API, + TDS_SIMULATIONS, + job_id=kwargs.get('job_id'), + status="error") + + log_message = f""" + ############################### + + There was an exception in CIEMSS Service + + Error occured in function: {function.__name__} + + ################################ + """ + logging.exception(log_message) + raise e + return wrapped \ No newline at end of file