Skip to content

Commit

Permalink
Create directories for each job (#23)
Browse files Browse the repository at this point in the history
* Save to temp with job name to ensure no overlap between jobs

* Fix null return of path
  • Loading branch information
fivegrant authored Jul 20, 2023
1 parent f45b97a commit 22c9191
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 26 deletions.
61 changes: 39 additions & 22 deletions workers/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import numpy as np
import requests
from utils import (
make_job_dir,
update_tds_status,
fetch_dataset,
fetch_model,
Expand Down Expand Up @@ -44,11 +45,15 @@ def simulate(*args, **kwargs):
logging.debug(f"{job_id} (username - {username}): start simulate")

sim_results_url = TDS_API + TDS_SIMULATIONS + job_id
job_dir = make_job_dir(job_id)
output_filename = os.path.join(job_dir, OUTPUT_FILENAME)
eval_output_filename = os.path.join(job_dir, EVAL_OUTPUT_FILENAME)
visualization_filename = os.path.join(job_dir, "./visualization.json")

update_tds_status(sim_results_url, status="running", start=True)

# Get model from TDS
amr_path = fetch_model(model_config_id, TDS_API, TDS_CONFIGURATIONS)
amr_path = fetch_model(model_config_id, TDS_API, TDS_CONFIGURATIONS, job_dir)

# Generate timepoints
time_count = end - start
Expand All @@ -57,12 +62,12 @@ def simulate(*args, **kwargs):
output = load_and_sample_petri_model(amr_path, num_samples, timepoints=timepoints, **kwargs)
samples = output.get('data')
schema = output.get('visual')
with open("visualization.json", "w") as f:
with open(visualization_filename, "w") as f:
json.dump(schema, f, indent=2)
samples.to_csv(OUTPUT_FILENAME, index=False)
samples.to_csv(output_filename, index=False)
eval = output.get('quantiles')
eval.to_csv(EVAL_OUTPUT_FILENAME, index=False)
attach_files({OUTPUT_FILENAME: "result.csv", "visualization.json": "visualization.json", EVAL_OUTPUT_FILENAME: EVAL_OUTPUT_FILENAME}, TDS_API, TDS_SIMULATIONS, job_id)
eval.to_csv(eval_output_filename, index=False)
attach_files({output_filename: "result.csv", visualization_filename: "visualization.json", eval_output_filename: "eval.csv"}, TDS_API, TDS_SIMULATIONS, job_id)
logging.debug(f"{job_id} (username - {username}): finish simulate")

return
Expand All @@ -78,16 +83,20 @@ def calibrate_then_simulate(*args, **kwargs):
logging.debug(f"{job_id} (username - {username}): start calibrate")

sim_results_url = TDS_API + TDS_SIMULATIONS + job_id
job_dir = make_job_dir(job_id)
output_filename = os.path.join(job_dir, OUTPUT_FILENAME)
eval_output_filename = os.path.join(job_dir, EVAL_OUTPUT_FILENAME)
visualization_filename = os.path.join(job_dir, "./visualization.json")

update_tds_status(sim_results_url, status="running", start=True)

amr_path = fetch_model(model_config_id, TDS_API, TDS_CONFIGURATIONS)
amr_path = fetch_model(model_config_id, TDS_API, TDS_CONFIGURATIONS, job_dir)

# Generate timepoints
time_count = end - start
timepoints=[x for x in range(1,time_count+1)]

dataset_path = fetch_dataset(kwargs.pop("dataset"), TDS_API)
dataset_path = fetch_dataset(kwargs.pop("dataset"), TDS_API, job_dir)

output = load_and_calibrate_and_sample_petri_model(
amr_path,
Expand All @@ -97,12 +106,12 @@ def calibrate_then_simulate(*args, **kwargs):
)
samples = output.get('data')
schema = output.get('visual')
with open("visualization.json", "w") as f:
with open(visualization_filename, "w") as f:
json.dump(schema, f, indent=2)
samples.to_csv(OUTPUT_FILENAME, index=False)
samples.to_csv(output_filename, index=False)
eval = output.get('quantiles')
eval.to_csv(EVAL_OUTPUT_FILENAME, index=False)
attach_files({OUTPUT_FILENAME: "simulation.csv", "visualization.json": "visualization.json", EVAL_OUTPUT_FILENAME: EVAL_OUTPUT_FILENAME}, TDS_API, TDS_SIMULATIONS, job_id)
eval.to_csv(eval_output_filename, index=False)
attach_files({output_filename: "simulation.csv", visualization_filename: "visualization.json", eval_output_filename: "eval.csv"}, TDS_API, TDS_SIMULATIONS, job_id)

logging.debug(f"{job_id} (username - {username}): finish calibrate")

Expand All @@ -120,12 +129,16 @@ def ensemble_simulate(*args, **kwargs):
logging.debug(f"{job_id} (username - {username}): start ensemble simulate")

sim_results_url = TDS_API + TDS_SIMULATIONS + job_id
job_dir = make_job_dir(job_id)
output_filename = os.path.join(job_dir, OUTPUT_FILENAME)
eval_output_filename = os.path.join(job_dir, EVAL_OUTPUT_FILENAME)
visualization_filename = os.path.join(job_dir, "./visualization.json")

update_tds_status(sim_results_url, status="running", start=True)

weights = [config["weight"] for config in model_configs]
solution_mappings = [config["solution_mappings"] for config in model_configs]
amr_paths = [fetch_model(config["id"], TDS_API, TDS_CONFIGURATIONS) for config in model_configs]
amr_paths = [fetch_model(config["id"], TDS_API, TDS_CONFIGURATIONS, job_dir) for config in model_configs]

# Generate timepoints
time_count = end - start
Expand All @@ -141,12 +154,12 @@ def ensemble_simulate(*args, **kwargs):
)
samples = output.get('data')
schema = output.get('visual')
with open("visualization.json", "w") as f:
with open(visualization_filename, "w") as f:
json.dump(schema, f, indent=2)
samples.to_csv(OUTPUT_FILENAME, index=False)
samples.to_csv(output_filename, index=False)
eval = output.get('quantiles')
eval.to_csv(EVAL_OUTPUT_FILENAME, index=False)
attach_files({OUTPUT_FILENAME: "simulation.csv", "visualization.json": "visualization.json", EVAL_OUTPUT_FILENAME: EVAL_OUTPUT_FILENAME}, TDS_API, TDS_SIMULATIONS, job_id)
eval.to_csv(eval_output_filename, index=False)
attach_files({output_filename: "simulation.csv", visualization_filename: "visualization.json", eval_output_filename: "eval.csv"}, TDS_API, TDS_SIMULATIONS, job_id)
logging.debug(f"{job_id} (username - {username}): finish ensemble simulate")
return True

Expand All @@ -163,14 +176,18 @@ def ensemble_calibrate(*args, **kwargs):
logging.debug(f"{job_id} (username - {username}): start ensemble calibrate")

sim_results_url = TDS_API + TDS_SIMULATIONS + job_id
job_dir = make_job_dir(job_id)
output_filename = os.path.join(job_dir, OUTPUT_FILENAME)
eval_output_filename = os.path.join(job_dir, EVAL_OUTPUT_FILENAME)
visualization_filename = os.path.join(job_dir, "./visualization.json")

update_tds_status(sim_results_url, status="running", start=True)

weights = [config["weight"] for config in model_configs]
solution_mappings = [config["solution_mappings"] for config in model_configs]
amr_paths = [fetch_model(config["id"], TDS_API, TDS_CONFIGURATIONS) for config in model_configs]
amr_paths = [fetch_model(config["id"], TDS_API, TDS_CONFIGURATIONS, job_dir) for config in model_configs]

dataset_path = fetch_dataset(dataset, TDS_API)
dataset_path = fetch_dataset(dataset, TDS_API, job_dir)

# Generate timepoints
time_count = end - start
Expand All @@ -187,11 +204,11 @@ def ensemble_calibrate(*args, **kwargs):
)
samples = output.get('data')
schema = output.get('visual')
with open("visualization.json", "w") as f:
with open(visualization_filename, "w") as f:
json.dump(schema, f, indent=2)
samples.to_csv(OUTPUT_FILENAME, index=False)
samples.to_csv(output_filename, index=False)
eval = output.get('quantiles')
eval.to_csv(EVAL_OUTPUT_FILENAME, index=False)
attach_files({OUTPUT_FILENAME: "simulation.csv", "visualization.json": "visualization.json", EVAL_OUTPUT_FILENAME: EVAL_OUTPUT_FILENAME}, TDS_API, TDS_SIMULATIONS, job_id)
eval.to_csv(eval_output_filename, index=False)
attach_files({output_filename: "simulation.csv", visualization_filename: "visualization.json", eval_output_filename: "eval.csv"}, TDS_API, TDS_SIMULATIONS, job_id)
logging.debug(f"{job_id} (username - {username}): finish ensemble calibrate")
return True
15 changes: 11 additions & 4 deletions workers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)


def make_job_dir(job_id):
path = os.path.join("/tmp", str(job_id))
os.makedirs(path)
return path


def update_tds_status(url, status, result_files=[], start=False, finish=False):
logging.debug(f"Updating simulation `{url}` -- {status} start: {start}; finish: {finish}; result_files: {result_files}")
tds_payload = requests.get(url)
Expand All @@ -38,26 +45,26 @@ def update_tds_status(url, status, result_files=[], start=False, finish=False):
return update_response


def fetch_model(model_config_id, tds_api, config_endpoint):
def fetch_model(model_config_id, tds_api, config_endpoint, job_dir):
logging.debug(f"Fetching model {model_config_id}")
url_components = [tds_api, config_endpoint, model_config_id]
model_url = ""
for component in url_components:
model_url = urllib.parse.urljoin(model_url, component)
model_response = requests.get(model_url)
amr_path = os.path.abspath(f"./{model_config_id}.json")
amr_path = os.path.join(job_dir, f"./{model_config_id}.json")
with open(amr_path, "w") as file:
json.dump(model_response.json()["configuration"], file)
return amr_path


def fetch_dataset(dataset: dict, tds_api):
def fetch_dataset(dataset: dict, tds_api, job_dir):
logging.debug(f"Fetching dataset {dataset['id']}")
dataset_url = f"{tds_api}/datasets/{dataset['id']}/download-url?filename={dataset['filename']}"
response = requests.get(dataset_url)
df = pandas.read_csv(response.json()["url"])
df = df.rename(columns=dataset["mappings"])
dataset_path = os.path.abspath("./temp.json")
dataset_path = os.path.join(job_dir, "./temp.json")
with open(dataset_path, "w") as file:
df.to_csv(file, index=False)
return dataset_path
Expand Down

0 comments on commit 22c9191

Please sign in to comment.