Skip to content

Commit

Permalink
new parallel runner
Browse files Browse the repository at this point in the history
  • Loading branch information
GregGlickert committed Nov 20, 2024
1 parent 3aa3e39 commit b78659c
Show file tree
Hide file tree
Showing 14 changed files with 5,720 additions and 2,367 deletions.
93 changes: 77 additions & 16 deletions bmtool/SLURM.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import subprocess
import json
import requests
import shutil


def check_job_status(job_id):
Expand Down Expand Up @@ -106,7 +107,6 @@ def edit_json(self, new_value):

print(f"JSON file '{self.json_file_path}' modified successfully with {self.param_name}={new_value}.", flush=True)


def change_json_file_path(self,new_json_file_path):
self.json_file_path = new_json_file_path

Expand Down Expand Up @@ -156,8 +156,7 @@ def edit_all_jsons(self, new_value):


class SimulationBlock:
def __init__(self, block_name, time, partition, nodes, ntasks, mem, simulation_cases, output_base_dir,account=None,additional_commands=None,
status_list = ['COMPLETED', 'FAILED', 'CANCELLED']):
def __init__(self, block_name, time, partition, nodes, ntasks, mem, simulation_cases, output_base_dir,account=None,additional_commands=None,status_list = ['COMPLETED', 'FAILED', 'CANCELLED'],component_path=None):
"""
Initializes the SimulationBlock instance.
Expand Down Expand Up @@ -187,6 +186,7 @@ def __init__(self, block_name, time, partition, nodes, ntasks, mem, simulation_c
self.additional_commands = additional_commands if additional_commands is not None else []
self.status_list = status_list
self.job_ids = []
self.component_path = component_path

def create_batch_script(self, case_name, command):
"""
Expand All @@ -207,6 +207,8 @@ def create_batch_script(self, case_name, command):
additional_commands_str = "\n".join(self.additional_commands)
# Conditional account linegit
account_line = f"#SBATCH --account={self.account}\n" if self.account else ""
env_var_component_path = f"export COMPONENT_PATH={self.component_path}" if self.component_path else ""


# Write the batch script to the file
with open(batch_script_path, 'w') as script_file:
Expand All @@ -224,6 +226,9 @@ def create_batch_script(self, case_name, command):
# Additional user-defined commands
{additional_commands_str}
#enviroment vars
{env_var_component_path}
export OUTPUT_DIR={case_output_dir}
{command}
Expand Down Expand Up @@ -272,7 +277,6 @@ def check_block_completed(self):
return False
return True


def check_block_running(self):
"""checks if a job is running
Expand All @@ -284,9 +288,21 @@ def check_block_running(self):
if status != 'RUNNING': #
return False
return True

def check_block_submited(self):
"""checks if a job is running
Returns:
bool: True if jobs are RUNNING false if anything else
"""
for job_id in self.job_ids:
status = check_job_status(job_id)
if status != 'PENDING': #
return False
return True

class SequentialBlockRunner:

class BlockRunner:
"""
Class to handle submitting multiple blocks sequentially.
Expand All @@ -297,17 +313,23 @@ class SequentialBlockRunner:
webhook (str): a microsoft webhook for teams. When used will send teams messages to the hook!
"""

def __init__(self, blocks, json_editor=None, param_values=None, check_interval=200,webhook=None):
def __init__(self, blocks, json_editor=None,json_file_path=None, param_name=None,
param_values=None, check_interval=60,syn_dict_list = None,
webhook=None):
self.blocks = blocks
self.json_editor = json_editor
self.param_values = param_values
self.check_interval = check_interval
self.webhook = webhook
self.param_name = param_name
self.json_file_path = json_file_path
self.syn_dict_list = syn_dict_list

def submit_blocks_sequentially(self):
"""
Submits all blocks sequentially, ensuring each block starts only after the previous block has completed.
Submits all blocks sequentially, ensuring each block starts only after the previous block has completed or is running.
Updates the JSON file with new parameters before each block run.
json file path should be the path WITH the components folder
"""
for i, block in enumerate(self.blocks):
# Update JSON file with new parameter value
Expand All @@ -317,15 +339,14 @@ def submit_blocks_sequentially(self):
if len(self.blocks) != len(self.param_values):
raise Exception("Number of blocks needs to each number of params given")
new_value = self.param_values[i]
# NGL didnt test the multi but should work
if isinstance(self.json_editor, multiSeedSweep):
self.json_editor.edit_all_jsons(new_value)
elif isinstance(self.json_editor,seedSweep):
print(f"Updating JSON file with parameter value for block: {block.block_name}", flush=True)
self.json_editor.edit_json(new_value)
else:
raise Exception("json editor provided but not a seedSweep class not sure what your doing?!?")

if self.syn_dict_list == None:
json_editor = seedSweep(self.json_file_path, self.param_name)
json_editor.edit_json(new_value)
else:
json_editor = multiSeedSweep(self.json_file_path,self.param_name,
self.syn_dict_list,base_ratio=1)
json_editor.edit_all_jsons(new_value)

# Submit the block
print(f"Submitting block: {block.block_name}", flush=True)
Expand All @@ -335,7 +356,7 @@ def submit_blocks_sequentially(self):
send_teams_message(self.webhook,message)

# Wait for the block to complete
if i == len(self.blocks) - 1: # Corrected index to check the last block
if i == len(self.blocks) - 1:
while not block.check_block_completed():
print(f"Waiting for the last block {i} to complete...")
time.sleep(self.check_interval)
Expand All @@ -350,3 +371,43 @@ def submit_blocks_sequentially(self):
message = "SIMULATION UPDATE: Simulation are Done!"
send_teams_message(self.webhook,message)

def submit_blocks_parallel(self):
"""
submits all the blocks at once onto the queue. To do this the components dir will be cloned and each block will have its own.
Also the json_file_path should be the path after the components dir
"""
if self.webhook:
message = "SIMULATION UPDATE: Simulations have been submited in parallel!"
send_teams_message(self.webhook,message)
for i, block in enumerate(self.blocks):
if block.component_path == None:
raise Exception("Unable to use parallel submitter without defining the component path")
new_value = self.param_values[i]

source_dir = block.component_path
destination_dir = f"{source_dir}{i+1}"
block.component_path = destination_dir

shutil.copytree(source_dir, destination_dir) # create new components folder
json_file_path = os.path.join(destination_dir,self.json_file_path)
if self.syn_dict_list == None:
json_editor = seedSweep(json_file_path, self.param_name)
json_editor.edit_json(new_value)
else:
json_editor = multiSeedSweep(json_file_path,self.param_name,
self.syn_dict_list,base_ratio=1)
json_editor.edit_all_jsons(new_value)

# submit block with new component path
print(f"Submitting block: {block.block_name}", flush=True)
block.submit_block()
if i == len(self.blocks) - 1:
while not block.check_block_completed():
print(f"Waiting for the last block {i} to complete...")
time.sleep(self.check_interval)

if self.webhook:
message = "SIMULATION UPDATE: Simulations are Done!"
send_teams_message(self.webhook,message)


Loading

0 comments on commit b78659c

Please sign in to comment.