From 2d72d873016a1796c146fe8086df1c405f7a96c4 Mon Sep 17 00:00:00 2001 From: Five Grant <5@fivegrant.com> Date: Mon, 17 Jul 2023 10:23:22 -0500 Subject: [PATCH 1/9] [broken] add ensemble-simulate --- api/server.py | 36 ++++++++++++++++++++++++++++++------ workers/operations.py | 10 ++++++++++ 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/api/server.py b/api/server.py index 928d49a..73ad6aa 100644 --- a/api/server.py +++ b/api/server.py @@ -124,14 +124,38 @@ def calibrate_model(body: CalibratePostRequest) -> JobResponse: return response -@app.post("/ensemble", response_model=JobResponse) +@app.post("/ensemble-simulate", response_model=JobResponse) def create_ensemble(body: EnsemblePostRequest) -> JobResponse: """ - Perform an ensemble simulation + Perform ensemble simulate """ - return Response( - status_code=status.HTTP_501_NOT_IMPLEMENTED, - content="Ensemble is not yet implemented", - ) + from utils import create_job + + # Parse request body + print(body) + engine = str(body.engine).lower() + model_config_id = body.model_config_id + dataset = body.dataset + start = body.timespan.start + end = body.timespan.end + extra = body.extra.dict() + + + operation_name = "operations.calibrate_then_simulate" + options = { + "engine": engine, + "model_config_id": model_config_id, + "start": start, + "end": end, + "dataset": dataset.dict(), + "extra": extra, + "visual_options": True + } + + resp = create_job(operation_name=operation_name, options=options) + + response = {"simulation_id": resp["id"]} + + return response diff --git a/workers/operations.py b/workers/operations.py index 8c0883c..e30b834 100644 --- a/workers/operations.py +++ b/workers/operations.py @@ -87,3 +87,13 @@ def calibrate_then_simulate(*args, **kwargs): return True + + +@catch_job_status +def ensemble_simulate(*args, **kwargs): + return True + + +@catch_job_status +def ensemble_calibrate(*args, **kwargs): + return True From 7b267308fe3faf0fdceb7690766476c91e006709 Mon Sep 17 00:00:00 2001 From: Five Grant <5@fivegrant.com> Date: Mon, 17 Jul 2023 12:36:02 -0500 Subject: [PATCH 2/9] Flesh out models --- api/models.py | 63 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 8 deletions(-) diff --git a/api/models.py b/api/models.py index 96b908e..b6617c5 100644 --- a/api/models.py +++ b/api/models.py @@ -82,6 +82,42 @@ class Config: ) +class EnsembleSimulateExtra(BaseModel): + class Config: + extra = ExtraEnum.allow + + num_samples: int = Field( + 100, description="number of samples for a CIEMSS simulation", example=100 + ) + + +class EnsembleCalibrateExtra(BaseModel): + class Config: + extra = ExtraEnum.allow + + num_samples: int = Field( + 100, description="number of samples for a CIEMSS simulation", example=100 + ) + + total_population: int = Field( + 1000, description="total population", example=1000 + ) + + num_iterations: int = Field( + 350, description="number of iterations", example=1000 + ) + + time_unit: int = Field( + "days", description="units in numbers of days", example="days" + ) + + +class ModelConfig(BaseModel): + id: str = Field(..., example="cd339570-047d-11ee-be55") + solution_mappings: dict[str, str] = Field(..., example={"Infected": "Cases", "Hospitalizations": "hospitalized_population"}) + weight: float = Field(..., example="cd339570-047d-11ee-be55") + + class Dataset(BaseModel): id: str = Field(None, example="cd339570-047d-11ee-be55") filename: str = Field(None, example="dataset.csv") @@ -115,18 +151,29 @@ class CalibratePostRequest(BaseModel): ) -class EnsemblePostRequest(BaseModel): +class EnsembleSimulatePostRequest(BaseModel): engine: Engine = Field(..., example="ciemss") - model_configuration_ids: Optional[List[str]] = Field( + model_configs: List[ModelConfig] = Field( + [], + example=[], + ) + timespan: Timespan + + extra: EnsembleSimulateExtra = Field( None, - example=[ - "ba8da8d4-047d-11ee-be56", - "c1cd941a-047d-11ee-be56", - "c4b9f88a-047d-11ee-be56", - ], + description="optional extra system specific arguments for advanced use cases", + ) + + +class EnsembleCalibratePostRequest(BaseModel): + engine: Engine = Field(..., example="ciemss") + model_configs: List[ModelConfig] = Field( + [], + example=[], ) timespan: Timespan - extra: Optional[Dict[str, Any]] = Field( + dataset: Dataset + extra: EnsembleCalibrateExtra = Field( None, description="optional extra system specific arguments for advanced use cases", ) From b0582fe87230f63d651940ba84582674101b940e Mon Sep 17 00:00:00 2001 From: Five Grant <5@fivegrant.com> Date: Mon, 17 Jul 2023 14:25:19 -0500 Subject: [PATCH 3/9] Create both ensemble operations --- api/server.py | 48 +++++++++++++++++++++++---- workers/operations.py | 75 +++++++++++++++++++++++++++++++++++++++++++ workers/utils.py | 2 +- 3 files changed, 118 insertions(+), 7 deletions(-) diff --git a/api/server.py b/api/server.py index d268965..53f1418 100644 --- a/api/server.py +++ b/api/server.py @@ -10,6 +10,8 @@ CalibratePostRequest, EnsemblePostRequest, SimulatePostRequest, + EnsembleSimulatePostRequest, + EnsembleCalibratePostRequest, StatusSimulationIdGetResponse, ) @@ -139,29 +141,28 @@ def calibrate_model(body: CalibratePostRequest) -> JobResponse: @app.post("/ensemble-simulate", response_model=JobResponse) -def create_ensemble(body: EnsemblePostRequest) -> JobResponse: +def create_simulate_ensemble(body: EnsembleSimulatePostRequest) -> JobResponse: """ Perform ensemble simulate """ from utils import create_job # Parse request body - print(body) engine = str(body.engine).lower() - model_config_id = body.model_config_id - dataset = body.dataset + model_configs = [config.dict() for config in body.model_configs] start = body.timespan.start end = body.timespan.end + username = body.username extra = body.extra.dict() operation_name = "operations.calibrate_then_simulate" options = { "engine": engine, - "model_config_id": model_config_id, + "model_configs": model_configs, "start": start, "end": end, - "dataset": dataset.dict(), + "username": username, "extra": extra, "visual_options": True } @@ -173,4 +174,39 @@ def create_ensemble(body: EnsemblePostRequest) -> JobResponse: return response +@app.post("/ensemble-calibrate", response_model=JobResponse) +def create_calibrate_ensemble(body: EnsembleCalibratePostRequest) -> JobResponse: + """ + Perform ensemble simulate + """ + from utils import create_job + + # Parse request body + engine = str(body.engine).lower() + username = body.username + dataset = body.dataset.dict() + model_configs = [config.dict() for config in body.model_configs] + start = body.timespan.start + end = body.timespan.end + extra = body.extra.dict() + + + operation_name = "operations.calibrate_then_simulate" + options = { + "engine": engine, + "model_configs": model_configs, + "dataset": dataset, + "start": start, + "end": end, + "username": username, + "extra": extra, + "visual_options": True + } + + resp = create_job(operation_name=operation_name, options=options) + + response = {"simulation_id": resp["id"]} + + return response + diff --git a/workers/operations.py b/workers/operations.py index 6dffbf0..ea3c5ff 100644 --- a/workers/operations.py +++ b/workers/operations.py @@ -18,6 +18,11 @@ load_and_sample_petri_model, ) +from pyciemss.Ensemble.interfaces import ( + load_and_sample_petri_ensemble, load_and_calibrate_and_sample_ensemble_model +) + + TDS_CONFIGURATIONS = "/model_configurations/" TDS_SIMULATIONS = "/simulations/" OUTPUT_FILENAME = os.getenv("PYCIEMSS_OUTPUT_FILEPATH") @@ -93,9 +98,79 @@ def calibrate_then_simulate(*args, **kwargs): @catch_job_status def ensemble_simulate(*args, **kwargs): + model_configs = kwargs.pop("model_configs") + start = kwargs.pop("start") + end = kwargs.pop("end") + num_samples = kwargs.pop("num_samples") + username = kwargs.pop("username") + job_id = kwargs.pop("job_id") + + sim_results_url = TDS_API + TDS_SIMULATIONS + job_id + + update_tds_status(sim_results_url, status="running", start=True) + + weights = [config["weight"] for config in model_configs] + solution_mappings = [config["solution_mappings"] for config in model_configs] + amr_paths = [fetch_model(config["id"], TDS_API, TDS_CONFIGURATIONS) for config in model_configs] + + # Generate timepoints + time_count = end - start + timepoints=[x for x in range(1,time_count+1)] + + output = load_and_sample_petri_ensemble( + amr_paths, + weights, + solution_mappings, + num_samples + timepoints, + **kwargs + ) + samples = output.get('data') + schema = output.get('visual') + with open("visualization.json", "w") as f: + json.dump(schema, f, indent=2) + samples.to_csv(OUTPUT_FILENAME, index=False) + attach_files({OUTPUT_FILENAME: "simulation.csv", "visualization.json": "visualization.json"}, TDS_API, TDS_SIMULATIONS, job_id) return True @catch_job_status def ensemble_calibrate(*args, **kwargs): + model_configs = kwargs.pop("model_configs") + start = kwargs.pop("start") + end = kwargs.pop("end") + num_samples = kwargs.pop("num_samples") + dataset = kwargs.pop("dataset") + username = kwargs.pop("username") + job_id = kwargs.pop("job_id") + + sim_results_url = TDS_API + TDS_SIMULATIONS + job_id + + update_tds_status(sim_results_url, status="running", start=True) + + weights = [config["weight"] for config in model_configs] + solution_mappings = [config["solution_mappings"] for config in model_configs] + amr_paths = [fetch_model(config["id"], TDS_API, TDS_CONFIGURATIONS) for config in model_configs] + + dataset_path = fetch_dataset(kwargs.pop("dataset"), TDS_API) + + # Generate timepoints + time_count = end - start + timepoints=[x for x in range(1,time_count+1)] + + output = load_and_calibrate_and_sample_ensemble_model( + amr_paths, + weights, + dataset_path + solution_mappings, + num_samples + timepoints, + **kwargs + ) + samples = output.get('data') + schema = output.get('visual') + with open("visualization.json", "w") as f: + json.dump(schema, f, indent=2) + samples.to_csv(OUTPUT_FILENAME, index=False) + attach_files({OUTPUT_FILENAME: "simulation.csv", "visualization.json": "visualization.json"}, TDS_API, TDS_SIMULATIONS, job_id) return True diff --git a/workers/utils.py b/workers/utils.py index d1a9bce..e60f52a 100644 --- a/workers/utils.py +++ b/workers/utils.py @@ -119,7 +119,7 @@ def fetch_model(model_config_id, tds_api, config_endpoint): for component in url_components: model_url = urllib.parse.urljoin(model_url, component) model_response = requests.get(model_url) - amr_path = os.path.abspath("./amr.json") + amr_path = os.path.abspath(f"./{model_config_id}.json") with open(amr_path, "w") as file: json.dump(model_response.json()["configuration"], file) return amr_path From 1951c4ec1081563e29b70fdc39136b9c0975d86a Mon Sep 17 00:00:00 2001 From: Five Grant <5@fivegrant.com> Date: Mon, 17 Jul 2023 14:30:31 -0500 Subject: [PATCH 4/9] Remove nonexistent import of model --- api/server.py | 1 - docker-compose.yaml | 5 ----- 2 files changed, 6 deletions(-) diff --git a/api/server.py b/api/server.py index 53f1418..5b9f5c0 100644 --- a/api/server.py +++ b/api/server.py @@ -8,7 +8,6 @@ Status, JobResponse, CalibratePostRequest, - EnsemblePostRequest, SimulatePostRequest, EnsembleSimulatePostRequest, EnsembleCalibratePostRequest, diff --git a/docker-compose.yaml b/docker-compose.yaml index 02fee43..21b5b65 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -3,9 +3,6 @@ networks: pyciemss: driver: bridge name: pyciemss - data-api: - #TODO Remove in production - external: true services: api: container_name: pyciemss-api @@ -18,7 +15,6 @@ services: - api.env networks: - pyciemss - - data-api # TODO Remove in production depends_on: - redis volumes: @@ -43,4 +39,3 @@ services: - redis networks: - pyciemss - - data-api From 97d26663624ecd98b30d93f95aa6480bd029739a Mon Sep 17 00:00:00 2001 From: Five Grant <5@fivegrant.com> Date: Mon, 17 Jul 2023 14:43:27 -0500 Subject: [PATCH 5/9] Fix job execution issues --- workers/operations.py | 6 +++--- workers/utils.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/workers/operations.py b/workers/operations.py index ea3c5ff..3124afb 100644 --- a/workers/operations.py +++ b/workers/operations.py @@ -121,7 +121,7 @@ def ensemble_simulate(*args, **kwargs): amr_paths, weights, solution_mappings, - num_samples + num_samples, timepoints, **kwargs ) @@ -161,9 +161,9 @@ def ensemble_calibrate(*args, **kwargs): output = load_and_calibrate_and_sample_ensemble_model( amr_paths, weights, - dataset_path + dataset_path, solution_mappings, - num_samples + num_samples, timepoints, **kwargs ) diff --git a/workers/utils.py b/workers/utils.py index e60f52a..d32a8fc 100644 --- a/workers/utils.py +++ b/workers/utils.py @@ -164,7 +164,7 @@ def wrapped(*args, **kwargs): result = function(*args, **kwargs) end_time = time.perf_counter() logging.info( - f"Elapsed time for {function.__name__} for {kwargs["username"]}:", + f"Elapsed time for {function.__name__} for {kwargs['username']}:", end_time - start_time ) return result @@ -183,7 +183,7 @@ def wrapped(*args, **kwargs): Error occured in function: {function.__name__} - Username: {kwargs["username"]} + Username: {kwargs['username']} ################################ """ From ec6933d473bed93147c167980ff91fbda039bcce Mon Sep 17 00:00:00 2001 From: Five Grant <5@fivegrant.com> Date: Mon, 17 Jul 2023 15:40:03 -0500 Subject: [PATCH 6/9] Redirect to correct job --- api/models.py | 2 ++ api/server.py | 4 ++-- workers/utils.py | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/api/models.py b/api/models.py index 864ba3c..1ca258f 100644 --- a/api/models.py +++ b/api/models.py @@ -158,6 +158,7 @@ class CalibratePostRequest(BaseModel): class EnsembleSimulatePostRequest(BaseModel): engine: Engine = Field(..., example="ciemss") + username: str = Field("not_provided", example="not_provided") model_configs: List[ModelConfig] = Field( [], example=[], @@ -172,6 +173,7 @@ class EnsembleSimulatePostRequest(BaseModel): class EnsembleCalibratePostRequest(BaseModel): engine: Engine = Field(..., example="ciemss") + username: str = Field("not_provided", example="not_provided") model_configs: List[ModelConfig] = Field( [], example=[], diff --git a/api/server.py b/api/server.py index 5b9f5c0..e0037c3 100644 --- a/api/server.py +++ b/api/server.py @@ -155,7 +155,7 @@ def create_simulate_ensemble(body: EnsembleSimulatePostRequest) -> JobResponse: extra = body.extra.dict() - operation_name = "operations.calibrate_then_simulate" + operation_name = "operations.ensemble_simulate" options = { "engine": engine, "model_configs": model_configs, @@ -190,7 +190,7 @@ def create_calibrate_ensemble(body: EnsembleCalibratePostRequest) -> JobResponse extra = body.extra.dict() - operation_name = "operations.calibrate_then_simulate" + operation_name = "operations.ensemble_calibrate" options = { "engine": engine, "model_configs": model_configs, diff --git a/workers/utils.py b/workers/utils.py index d32a8fc..7de159f 100644 --- a/workers/utils.py +++ b/workers/utils.py @@ -146,7 +146,8 @@ def attach_files(files: dict, tds_api, simulation_endpoint, job_id, status='comp presigned_upload_url = upload_response.json()["url"] with open(location, "rb") as f: upload_response = requests.put(presigned_upload_url, f) - + else: + logging.info(f"{job_id} ran into error") # Update simulation object with status and filepaths. update_tds_status( From c35bce064f9b79ae18e97f48693ad095eaa83f39 Mon Sep 17 00:00:00 2001 From: Five Grant <5@fivegrant.com> Date: Mon, 17 Jul 2023 17:05:41 -0500 Subject: [PATCH 7/9] Add better handling of simulation creation --- api/utils.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/api/utils.py b/api/utils.py index f33131f..6aa52a4 100644 --- a/api/utils.py +++ b/api/utils.py @@ -54,16 +54,17 @@ def create_job(operation_name: str, options: Optional[Dict[Any, Any]] = None): if STANDALONE: logging.info(f"OPTIONS: {options}") + # TODO: Allow extras on payload and simply put full object here ex_payload = { "engine": "ciemss", - "model_config_id": options.get("model_config_id"), + "model_config_id": options.get("model_config_id", "not_provided"), "timespan": { - "start": options.get("start"), - "end": options.get("end"), + "start": options.get("start", 0), + "end": options.get("end", 1), }, - "extra": options.get("extra"), + "extra": options.get("extra", None), } - post_url = TDS_API + TDS_SIMULATIONS + job_id + post_url = TDS_API + TDS_SIMULATIONS #+ job_id payload = { "id": job_id, "execution_payload": ex_payload, @@ -75,7 +76,10 @@ def create_job(operation_name: str, options: Optional[Dict[Any, Any]] = None): } logging.info(payload) sys.stdout.flush() - logging.info(requests.put(post_url, json=json.loads(json.dumps(payload))).content) + response = requests.post(post_url, json=payload) + if response.status_code >= 300: + raise Exception(f"Failed to create simulation on TDS (status: {response.status_code}): {json.dumps(payload)}") + logging.info(response.content) if job and force_restart: job.cleanup(ttl=0) # Cleanup/remove data immediately From f772e0077f44402584e91b0aeb9588646e4b2958 Mon Sep 17 00:00:00 2001 From: Five Grant <5@fivegrant.com> Date: Mon, 17 Jul 2023 17:46:45 -0500 Subject: [PATCH 8/9] Fix ensemble-calibrate --- api/models.py | 4 ++-- workers/operations.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/api/models.py b/api/models.py index 1ca258f..0a2324f 100644 --- a/api/models.py +++ b/api/models.py @@ -121,8 +121,8 @@ class ModelConfig(BaseModel): class Dataset(BaseModel): id: str = Field(None, example="cd339570-047d-11ee-be55") filename: str = Field(None, example="dataset.csv") - mappings: Optional[Dict[str, str]] = Field( - None, + mappings: Dict[str, str] = Field( + default_factory=dict, description="Mappings from the dataset column names to the model names they should be replaced with.", example={'postive_tests': 'infected'}, ) diff --git a/workers/operations.py b/workers/operations.py index 3124afb..4957b51 100644 --- a/workers/operations.py +++ b/workers/operations.py @@ -152,7 +152,7 @@ def ensemble_calibrate(*args, **kwargs): solution_mappings = [config["solution_mappings"] for config in model_configs] amr_paths = [fetch_model(config["id"], TDS_API, TDS_CONFIGURATIONS) for config in model_configs] - dataset_path = fetch_dataset(kwargs.pop("dataset"), TDS_API) + dataset_path = fetch_dataset(dataset, TDS_API) # Generate timepoints time_count = end - start From 52442a5775c792629ace17eda334b31d550e7414 Mon Sep 17 00:00:00 2001 From: Five Grant <5@fivegrant.com> Date: Mon, 17 Jul 2023 17:50:59 -0500 Subject: [PATCH 9/9] Revert docker compose --- docker-compose.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docker-compose.yaml b/docker-compose.yaml index 21b5b65..02fee43 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -3,6 +3,9 @@ networks: pyciemss: driver: bridge name: pyciemss + data-api: + #TODO Remove in production + external: true services: api: container_name: pyciemss-api @@ -15,6 +18,7 @@ services: - api.env networks: - pyciemss + - data-api # TODO Remove in production depends_on: - redis volumes: @@ -39,3 +43,4 @@ services: - redis networks: - pyciemss + - data-api