From 22c91911f2997fa0e23649b2f6c4b489b637cbdc Mon Sep 17 00:00:00 2001 From: Five Grant <5@fivegrant.com> Date: Thu, 20 Jul 2023 08:12:46 -0500 Subject: [PATCH] Create directories for each job (#23) * Save to temp with job name to ensure no overlap between jobs * Fix null return of path --- workers/operations.py | 61 +++++++++++++++++++++++++++---------------- workers/utils.py | 15 ++++++++--- 2 files changed, 50 insertions(+), 26 deletions(-) diff --git a/workers/operations.py b/workers/operations.py index 2ed3d7d..84da4dd 100644 --- a/workers/operations.py +++ b/workers/operations.py @@ -7,6 +7,7 @@ import numpy as np import requests from utils import ( + make_job_dir, update_tds_status, fetch_dataset, fetch_model, @@ -44,11 +45,15 @@ def simulate(*args, **kwargs): logging.debug(f"{job_id} (username - {username}): start simulate") sim_results_url = TDS_API + TDS_SIMULATIONS + job_id + job_dir = make_job_dir(job_id) + output_filename = os.path.join(job_dir, OUTPUT_FILENAME) + eval_output_filename = os.path.join(job_dir, EVAL_OUTPUT_FILENAME) + visualization_filename = os.path.join(job_dir, "./visualization.json") update_tds_status(sim_results_url, status="running", start=True) # Get model from TDS - amr_path = fetch_model(model_config_id, TDS_API, TDS_CONFIGURATIONS) + amr_path = fetch_model(model_config_id, TDS_API, TDS_CONFIGURATIONS, job_dir) # Generate timepoints time_count = end - start @@ -57,12 +62,12 @@ def simulate(*args, **kwargs): output = load_and_sample_petri_model(amr_path, num_samples, timepoints=timepoints, **kwargs) samples = output.get('data') schema = output.get('visual') - with open("visualization.json", "w") as f: + with open(visualization_filename, "w") as f: json.dump(schema, f, indent=2) - samples.to_csv(OUTPUT_FILENAME, index=False) + samples.to_csv(output_filename, index=False) eval = output.get('quantiles') - eval.to_csv(EVAL_OUTPUT_FILENAME, index=False) - attach_files({OUTPUT_FILENAME: "result.csv", "visualization.json": "visualization.json", EVAL_OUTPUT_FILENAME: EVAL_OUTPUT_FILENAME}, TDS_API, TDS_SIMULATIONS, job_id) + eval.to_csv(eval_output_filename, index=False) + attach_files({output_filename: "result.csv", visualization_filename: "visualization.json", eval_output_filename: "eval.csv"}, TDS_API, TDS_SIMULATIONS, job_id) logging.debug(f"{job_id} (username - {username}): finish simulate") return @@ -78,16 +83,20 @@ def calibrate_then_simulate(*args, **kwargs): logging.debug(f"{job_id} (username - {username}): start calibrate") sim_results_url = TDS_API + TDS_SIMULATIONS + job_id + job_dir = make_job_dir(job_id) + output_filename = os.path.join(job_dir, OUTPUT_FILENAME) + eval_output_filename = os.path.join(job_dir, EVAL_OUTPUT_FILENAME) + visualization_filename = os.path.join(job_dir, "./visualization.json") update_tds_status(sim_results_url, status="running", start=True) - amr_path = fetch_model(model_config_id, TDS_API, TDS_CONFIGURATIONS) + amr_path = fetch_model(model_config_id, TDS_API, TDS_CONFIGURATIONS, job_dir) # Generate timepoints time_count = end - start timepoints=[x for x in range(1,time_count+1)] - dataset_path = fetch_dataset(kwargs.pop("dataset"), TDS_API) + dataset_path = fetch_dataset(kwargs.pop("dataset"), TDS_API, job_dir) output = load_and_calibrate_and_sample_petri_model( amr_path, @@ -97,12 +106,12 @@ def calibrate_then_simulate(*args, **kwargs): ) samples = output.get('data') schema = output.get('visual') - with open("visualization.json", "w") as f: + with open(visualization_filename, "w") as f: json.dump(schema, f, indent=2) - samples.to_csv(OUTPUT_FILENAME, index=False) + samples.to_csv(output_filename, index=False) eval = output.get('quantiles') - eval.to_csv(EVAL_OUTPUT_FILENAME, index=False) - attach_files({OUTPUT_FILENAME: "simulation.csv", "visualization.json": "visualization.json", EVAL_OUTPUT_FILENAME: EVAL_OUTPUT_FILENAME}, TDS_API, TDS_SIMULATIONS, job_id) + eval.to_csv(eval_output_filename, index=False) + attach_files({output_filename: "simulation.csv", visualization_filename: "visualization.json", eval_output_filename: "eval.csv"}, TDS_API, TDS_SIMULATIONS, job_id) logging.debug(f"{job_id} (username - {username}): finish calibrate") @@ -120,12 +129,16 @@ def ensemble_simulate(*args, **kwargs): logging.debug(f"{job_id} (username - {username}): start ensemble simulate") sim_results_url = TDS_API + TDS_SIMULATIONS + job_id + job_dir = make_job_dir(job_id) + output_filename = os.path.join(job_dir, OUTPUT_FILENAME) + eval_output_filename = os.path.join(job_dir, EVAL_OUTPUT_FILENAME) + visualization_filename = os.path.join(job_dir, "./visualization.json") update_tds_status(sim_results_url, status="running", start=True) weights = [config["weight"] for config in model_configs] solution_mappings = [config["solution_mappings"] for config in model_configs] - amr_paths = [fetch_model(config["id"], TDS_API, TDS_CONFIGURATIONS) for config in model_configs] + amr_paths = [fetch_model(config["id"], TDS_API, TDS_CONFIGURATIONS, job_dir) for config in model_configs] # Generate timepoints time_count = end - start @@ -141,12 +154,12 @@ def ensemble_simulate(*args, **kwargs): ) samples = output.get('data') schema = output.get('visual') - with open("visualization.json", "w") as f: + with open(visualization_filename, "w") as f: json.dump(schema, f, indent=2) - samples.to_csv(OUTPUT_FILENAME, index=False) + samples.to_csv(output_filename, index=False) eval = output.get('quantiles') - eval.to_csv(EVAL_OUTPUT_FILENAME, index=False) - attach_files({OUTPUT_FILENAME: "simulation.csv", "visualization.json": "visualization.json", EVAL_OUTPUT_FILENAME: EVAL_OUTPUT_FILENAME}, TDS_API, TDS_SIMULATIONS, job_id) + eval.to_csv(eval_output_filename, index=False) + attach_files({output_filename: "simulation.csv", visualization_filename: "visualization.json", eval_output_filename: "eval.csv"}, TDS_API, TDS_SIMULATIONS, job_id) logging.debug(f"{job_id} (username - {username}): finish ensemble simulate") return True @@ -163,14 +176,18 @@ def ensemble_calibrate(*args, **kwargs): logging.debug(f"{job_id} (username - {username}): start ensemble calibrate") sim_results_url = TDS_API + TDS_SIMULATIONS + job_id + job_dir = make_job_dir(job_id) + output_filename = os.path.join(job_dir, OUTPUT_FILENAME) + eval_output_filename = os.path.join(job_dir, EVAL_OUTPUT_FILENAME) + visualization_filename = os.path.join(job_dir, "./visualization.json") update_tds_status(sim_results_url, status="running", start=True) weights = [config["weight"] for config in model_configs] solution_mappings = [config["solution_mappings"] for config in model_configs] - amr_paths = [fetch_model(config["id"], TDS_API, TDS_CONFIGURATIONS) for config in model_configs] + amr_paths = [fetch_model(config["id"], TDS_API, TDS_CONFIGURATIONS, job_dir) for config in model_configs] - dataset_path = fetch_dataset(dataset, TDS_API) + dataset_path = fetch_dataset(dataset, TDS_API, job_dir) # Generate timepoints time_count = end - start @@ -187,11 +204,11 @@ def ensemble_calibrate(*args, **kwargs): ) samples = output.get('data') schema = output.get('visual') - with open("visualization.json", "w") as f: + with open(visualization_filename, "w") as f: json.dump(schema, f, indent=2) - samples.to_csv(OUTPUT_FILENAME, index=False) + samples.to_csv(output_filename, index=False) eval = output.get('quantiles') - eval.to_csv(EVAL_OUTPUT_FILENAME, index=False) - attach_files({OUTPUT_FILENAME: "simulation.csv", "visualization.json": "visualization.json", EVAL_OUTPUT_FILENAME: EVAL_OUTPUT_FILENAME}, TDS_API, TDS_SIMULATIONS, job_id) + eval.to_csv(eval_output_filename, index=False) + attach_files({output_filename: "simulation.csv", visualization_filename: "visualization.json", eval_output_filename: "eval.csv"}, TDS_API, TDS_SIMULATIONS, job_id) logging.debug(f"{job_id} (username - {username}): finish ensemble calibrate") return True diff --git a/workers/utils.py b/workers/utils.py index a701344..4a0689e 100644 --- a/workers/utils.py +++ b/workers/utils.py @@ -17,6 +17,13 @@ logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) + +def make_job_dir(job_id): + path = os.path.join("/tmp", str(job_id)) + os.makedirs(path) + return path + + def update_tds_status(url, status, result_files=[], start=False, finish=False): logging.debug(f"Updating simulation `{url}` -- {status} start: {start}; finish: {finish}; result_files: {result_files}") tds_payload = requests.get(url) @@ -38,26 +45,26 @@ def update_tds_status(url, status, result_files=[], start=False, finish=False): return update_response -def fetch_model(model_config_id, tds_api, config_endpoint): +def fetch_model(model_config_id, tds_api, config_endpoint, job_dir): logging.debug(f"Fetching model {model_config_id}") url_components = [tds_api, config_endpoint, model_config_id] model_url = "" for component in url_components: model_url = urllib.parse.urljoin(model_url, component) model_response = requests.get(model_url) - amr_path = os.path.abspath(f"./{model_config_id}.json") + amr_path = os.path.join(job_dir, f"./{model_config_id}.json") with open(amr_path, "w") as file: json.dump(model_response.json()["configuration"], file) return amr_path -def fetch_dataset(dataset: dict, tds_api): +def fetch_dataset(dataset: dict, tds_api, job_dir): logging.debug(f"Fetching dataset {dataset['id']}") dataset_url = f"{tds_api}/datasets/{dataset['id']}/download-url?filename={dataset['filename']}" response = requests.get(dataset_url) df = pandas.read_csv(response.json()["url"]) df = df.rename(columns=dataset["mappings"]) - dataset_path = os.path.abspath("./temp.json") + dataset_path = os.path.join(job_dir, "./temp.json") with open(dataset_path, "w") as file: df.to_csv(file, index=False) return dataset_path