Skip to content

Commit

Permalink
DAOS-16167 test: update soak test to use internal job scheduler
Browse files Browse the repository at this point in the history
Skip-unit-tests: true
Skip-fault-injection-test: true
Test-tag: soak_smoke

Required-githooks: true

Signed-off-by: Maureen Jean <maureen.jean@intel.com>
  • Loading branch information
mjean308 committed Jul 23, 2024
1 parent 101bec4 commit 47e9f7c
Show file tree
Hide file tree
Showing 3 changed files with 348 additions and 110 deletions.
1 change: 1 addition & 0 deletions src/tests/ftest/util/job_manager_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ def __init__(self, job, subprocess=False, mpi_type="openmpi"):
self.tmpdir_base = FormattedParameter("--mca orte_tmpdir_base {}", None)
self.args = BasicParameter(None, None)
self.mpi_type = mpi_type
self.hostlist = FormattedParameter("-hosts {}", None)

def assign_hosts(self, hosts, path=None, slots=None, hostfile=True):
"""Assign the hosts to use with the command (-f).
Expand Down
225 changes: 158 additions & 67 deletions src/tests/ftest/util/soak_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
create_app_cmdline, create_dm_cmdline, create_fio_cmdline,
create_ior_cmdline, create_macsio_cmdline, create_mdtest_cmdline,
create_racer_cmdline, ddhhmmss_format, get_daos_server_logs, get_harassers,
get_journalctl, launch_exclude_reintegrate, launch_extend, launch_reboot,
launch_server_stop_start, launch_snapshot, launch_vmd_identify_check,
reserved_file_copy, run_event_check, run_metrics_check, run_monitor_check)
get_id, get_journalctl, launch_exclude_reintegrate, launch_extend,
launch_jobscript, launch_reboot, launch_server_stop_start, launch_snapshot,
launch_vmd_identify_check, reserved_file_copy, run_event_check,
run_metrics_check, run_monitor_check)


class SoakTestBase(TestWithServers):
Expand Down Expand Up @@ -78,6 +79,8 @@ def __init__(self, *args, **kwargs):
self.soak_log_dir = None
self.soak_dir = None
self.enable_scrubber = False
self.job_scheduler = None
self.Job_List = None

Check warning on line 83 in src/tests/ftest/util/soak_test_base.py

View workflow job for this annotation

GitHub Actions / Pylint check

invalid-name, Attribute name "Job_List" doesn't conform to snake_case naming style

def setUp(self):
"""Define test setup to be done."""
Expand All @@ -96,6 +99,7 @@ def setUp(self):
self.sharedsoaktest_dir = self.sharedsoak_dir + "/pass" + str(self.loop)
# Initialize dmg cmd
self.dmg_command = self.get_dmg_command()
self.job_scheduler = self.params.get("job_scheduler", "/run/*", default="slurm")
# Fail if slurm partition is not defined
# NOTE: Slurm reservation and partition are created before soak runs.
# CI uses partition=daos_client and no reservation.
Expand Down Expand Up @@ -132,7 +136,7 @@ def pre_tear_down(self):
self.log.info("<<preTearDown Started>> at %s", time.ctime())
errors = []
# clear out any jobs in squeue;
if self.failed_job_id_list:
if self.failed_job_id_list and self.job_scheduler == "slurm":
job_id = " ".join([str(job) for job in self.failed_job_id_list])
self.log.info("<<Cancel jobs in queue with ids %s >>", job_id)
cmd = "scancel --partition {} -u {} {}".format(
Expand Down Expand Up @@ -296,6 +300,59 @@ def harasser_job_done(self, args):
self.harasser_results[args["name"]] = args["status"]
self.harasser_args[args["name"]] = args["vars"]

def schedule_jobs(self):
"""Schedule jobs with internal scheduler."""
self.log.debug("DBG: schedule_jobs ENTERED ")
job_queue = multiprocessing.Queue()
jobid_list = []
node_list = self.hostlist_clients
for job_dict in self.Job_List:
jobid_list.append(job_dict["jobid"])
self.log.info(f"Submitting {len(jobid_list)} jobs at {time.ctime()}")

Check warning on line 311 in src/tests/ftest/util/soak_test_base.py

View workflow job for this annotation

GitHub Actions / Pylint check

logging-fstring-interpolation, Use lazy % formatting in logging functions
while True:
if time.time() > self.end_time or len(jobid_list) == 0:
break
jobs = []
job_results = {}
for job_dict in self.Job_List:
job_id = job_dict["jobid"]
if job_id in jobid_list:
node_count = job_dict["nodesperjob"]
if len(node_list) >= node_count:
self.log.debug(f"DBG: node_count {node_count}")

Check warning on line 322 in src/tests/ftest/util/soak_test_base.py

View workflow job for this annotation

GitHub Actions / Pylint check

logging-fstring-interpolation, Use lazy % formatting in logging functions
self.log.debug(f"DBG: node_list initial/queue {node_list}")

Check warning on line 323 in src/tests/ftest/util/soak_test_base.py

View workflow job for this annotation

GitHub Actions / Pylint check

logging-fstring-interpolation, Use lazy % formatting in logging functions
job_node_list = node_list[:node_count]
self.log.debug(f"DBG: node_list before launch_job {node_list}")

Check warning on line 325 in src/tests/ftest/util/soak_test_base.py

View workflow job for this annotation

GitHub Actions / Pylint check

logging-fstring-interpolation, Use lazy % formatting in logging functions
script = job_dict["jobscript"]
timeout = job_dict["jobtimeout"]
log = job_dict["joblog"]
error_log = job_dict["joberrlog"]
method = launch_jobscript
params = (self.log, job_queue, job_id, job_node_list,
script, log, error_log, timeout, self)
name = f"SOAK JOB {job_id}"

jobs.append(threading.Thread(target=method, args=params, name=name))
jobid_list.remove(job_id)
node_list = node_list[node_count:]
self.log.debug(f"DBG: node_list after launch_job {node_list}")

Check warning on line 338 in src/tests/ftest/util/soak_test_base.py

View workflow job for this annotation

GitHub Actions / Pylint check

logging-fstring-interpolation, Use lazy % formatting in logging functions
# run job scripts on all available nodes
for job in jobs:
job.start()
self.log.debug("DBG: all jobs started")
for job in jobs:
job.join()
self.log.debug("DBG: all jobs joined")
while not job_queue.empty():
job_results = job_queue.get()
# Results to return in queue
node_list.update(job_results["host_list"])
self.log.debug("DBG: Updating soak results")
self.soak_results[job_results["handle"]] = job_results["state"]
self.log.debug(f"DBG: node_list returned from queue {node_list}")

Check warning on line 352 in src/tests/ftest/util/soak_test_base.py

View workflow job for this annotation

GitHub Actions / Pylint check

logging-fstring-interpolation, Use lazy % formatting in logging functions

self.log.debug("DBG: schedule_jobs EXITED ")

def job_setup(self, jobs, pool):

Check warning on line 356 in src/tests/ftest/util/soak_test_base.py

View workflow job for this annotation

GitHub Actions / Pylint check

useless-return, Useless return at end of function or method
"""Create the cmdline needed to launch job.
Expand All @@ -304,28 +361,24 @@ def job_setup(self, jobs, pool):
pool (obj): TestPool obj
Returns:
job_cmdlist: list of sbatch scripts that can be launched
by slurm job manager
job_cmdlist: list of dictionary of jobs that can be launched
"""
job_cmdlist = []
self.log.info("<<Job_Setup %s >> at %s", self.test_name, time.ctime())
for job in jobs:
jobscript = []
# list of all job scripts
jobscripts = []
# command is a list of [sbatch_cmds, log_name] to create a single job script
commands = []
nodesperjob = self.params.get(
"nodesperjob", "/run/" + job + "/*", [1])
taskspernode = self.params.get(
"taskspernode", "/run/" + job + "/*", [1])
nodesperjob = self.params.get("nodesperjob", "/run/" + job + "/*", [1])
taskspernode = self.params.get("taskspernode", "/run/" + job + "/*", [1])
for npj in list(nodesperjob):
# nodesperjob = -1 indicates to use all nodes in client hostlist
if npj < 0:
npj = len(self.hostlist_clients)
if len(self.hostlist_clients) / npj < 1:
raise SoakTestError(
"<<FAILED: There are only {} client nodes for this job."
" Job requires {}".format(
len(self.hostlist_clients), npj))
raise SoakTestError(f"<<FAILED: There are only {len(self.hostlist_clients)}"
f" client nodes for this job. Job requires {npj}")
for ppn in list(taskspernode):
if "ior" in job:
commands = create_ior_cmdline(self, job, pool, ppn, npj)
Expand All @@ -345,47 +398,71 @@ def job_setup(self, jobs, pool):
commands = create_dm_cmdline(self, job, pool, ppn, npj)
else:
raise SoakTestError(f"<<FAILED: Job {job} is not supported. ")
jobscript = build_job_script(self, commands, job, npj, ppn)
job_cmdlist.extend(jobscript)
return job_cmdlist

def job_startup(self, job_cmdlist):
"""Submit job batch script.
jobscripts = build_job_script(self, commands, job, npj, ppn)

# Create a dictionary of all job definitions
for jobscript in jobscripts:
jobtimeout = self.params.get("job_timeout", "/run/" + job + "/*", 10)
self.Job_List.extend([{"jobscript": jobscript[0],
"nodesperjob": npj,
"taskspernode": ppn,
"hostlist": None,
"jobid": None,
"jobtimeout": jobtimeout,
"joblog": jobscript[1],
"joberrlog": jobscript[2]}])
# randomize job list
random.seed(4)
random.shuffle(self.Job_List)
return

Args:
job_cmdlist (list): list of jobs to execute
def job_startup(self):
"""Launch the job script.
Returns:
job_id_list: IDs of each job submitted to slurm.
job_id_list: list of job_ids for each job launched.
"""
self.log.info("<<Job Startup - %s >> at %s", self.test_name, time.ctime())
job_id_list = []
# before submitting the jobs to the queue, check the job timeout;
# before starting jobs, check the job timeout;
if time.time() > self.end_time:
self.log.info("<< SOAK test timeout in Job Startup>>")
return job_id_list
# job_cmdlist is a list of batch script files

for script in job_cmdlist:
try:
job_id = slurm_utils.run_slurm_script(self.log, str(script))
except slurm_utils.SlurmFailed as error:
self.log.error(error)
# Force the test to exit with failure
job_id = None
if job_id:
self.log.info(
"<<Job %s started with %s >> at %s",
job_id, script, time.ctime())
slurm_utils.register_for_job_results(job_id, self, max_wait=self.test_timeout)
# keep a list of the job_id's
job_id_list.append(int(job_id))
else:
# one of the jobs failed to queue; exit on first fail for now.
err_msg = f"Slurm failed to submit job for {script}"
job_id_list = []
raise SoakTestError(f"<<FAILED: Soak {self.test_name}: {err_msg}>>")
if self.job_scheduler == "slurm":
for job_dict in self.Job_List:
script = job_dict["jobscript"]
try:
job_id = slurm_utils.run_slurm_script(self.log, str(script))
except slurm_utils.SlurmFailed as error:
self.log.error(error)
# Force the test to exit with failure
job_id = None
if job_id:
self.log.info(
"<<Job %s started with %s >> at %s", job_id, script, time.ctime())
slurm_utils.register_for_job_results(job_id, self, max_wait=self.test_timeout)
# Update Job_List with the job_id
job_dict["job_id"] = int(job_id)
job_id_list.append(int(job_id))
else:
# one of the jobs failed to queue; exit on first fail for now.
err_msg = f"Job failed to run for {script}"
job_id_list = []
raise SoakTestError(f"<<FAILED: Soak {self.test_name}: {err_msg}>>")
else:
for job_dict in self.Job_List:
job_dict["jobid"] = get_id()
job_id_list.append(job_dict["jobid"])

# self.schedule_jobs()
method = self.schedule_jobs
name = "Job Scheduler"
scheduler = threading.Thread(target=method, name=name)
# scheduler = multiprocessing.Process(target=method, name=name)
scheduler.start()

return job_id_list

def job_completion(self, job_id_list):
Expand All @@ -397,8 +474,7 @@ def job_completion(self, job_id_list):
failed_job_id_list: IDs of each job that failed in slurm
"""
self.log.info(
"<<Job Completion - %s >> at %s", self.test_name, time.ctime())
self.log.info("<<Job Completion - %s >> at %s", self.test_name, time.ctime())
harasser_interval = 0
failed_harasser_msg = None
harasser_timer = time.time()
Expand All @@ -407,21 +483,27 @@ def job_completion(self, job_id_list):
since = journalctl_time()
# loop time exists after the first pass; no harassers in the first pass
if self.harasser_loop_time and self.harassers:
harasser_interval = self.harasser_loop_time / (
len(self.harassers) + 1)
harasser_interval = self.harasser_loop_time / (len(self.harassers) + 1)
# If there is nothing to do; exit
if job_id_list:

Check warning on line 488 in src/tests/ftest/util/soak_test_base.py

View workflow job for this annotation

GitHub Actions / Pylint check

too-many-nested-blocks, Too many nested blocks (6/5)
# wait for all the jobs to finish
while len(self.soak_results) < len(job_id_list):
# wait for the jobs to complete.
# enter tearDown before hitting the avocado timeout
self.log.debug(f"DBG: SOAK RESULTS 1 {self.soak_results}")

Check warning on line 491 in src/tests/ftest/util/soak_test_base.py

View workflow job for this annotation

GitHub Actions / Pylint check

logging-fstring-interpolation, Use lazy % formatting in logging functions
# wait for the jobs to complete unless test_timeout occurred
if time.time() > self.end_time:
self.log.info(
"<< SOAK test timeout in Job Completion at %s >>",
time.ctime())
for job in job_id_list:
if not slurm_utils.cancel_jobs(self.log, self.control, int(job)).passed:
self.fail(f"Error canceling Job {job}")
self.log.info("<< SOAK test timeout in Job Completion at %s >>", time.ctime())
if self.job_scheduler == "slurm":
for job in job_id_list:
if not slurm_utils.cancel_jobs(self.log, self.control, int(job)).passed:
self.fail(f"Error canceling Job {job}")
else:
# update soak_results to include job id NOT run and set state = CANCELLED
for job in job_id_list:
if job not in list(self.soak_results.keys()):
self.soak_results.update({job: "CANCELLED"})
self.log.info("FINAL STATE: soak job %s completed with : %s at %s",
job, "CANCELLED", time.ctime())
break
# monitor events every 15 min
if datetime.now() > check_time:
run_monitor_check(self)
Expand Down Expand Up @@ -456,12 +538,12 @@ def job_completion(self, job_id_list):
if failed_harasser_msg is not None:
self.all_failed_harassers.append(failed_harasser_msg)
# check for JobStatus = COMPLETED or CANCELLED (i.e. TEST TO)
self.log.debug(f"DBG: SOAK RESULTS 2 {self.soak_results}")
for job, result in list(self.soak_results.items()):
if result in ["COMPLETED", "CANCELLED"]:
job_id_list.remove(int(job))
else:
self.log.info(
"<< Job %s failed with status %s>>", job, result)
self.log.info("<< Job %s failed with status %s>>", job, result)
# gather all the logfiles for this pass and cleanup test nodes
cmd = f"/usr/bin/rsync -avtr --min-size=1B {self.soak_log_dir} {self.outputsoak_dir}/"
cmd2 = f"/usr/bin/rm -rf {self.soak_log_dir}"
Expand All @@ -472,6 +554,15 @@ def job_completion(self, job_id_list):
result = run_remote(self.log, self.hostlist_clients, cmd2, timeout=600)
if not result.passed:
self.log.error("Remote copy failed on %s", str(result.failed_hosts))
# copy script files from shared dir
sharedscr_dir = self.sharedsoak_dir + "/pass" + str(self.loop)
cmd3 = f"/usr/bin/rsync -avtr --min-size=1B {sharedscr_dir} {self.outputsoak_dir}/"
cmd4 = f"/usr/bin/rm -rf {sharedscr_dir}"
try:
run_local(self.log, cmd3, timeout=600)
run_local(self.log, cmd4, timeout=600)
except RunException as error:
self.log.info("Script file copy failed with %s", error)
# copy the local files; local host not included in hostlist_client
try:
run_local(self.log, cmd, timeout=600)
Expand Down Expand Up @@ -501,6 +592,8 @@ def execute_jobs(self, jobs, pools):
"""
job_script_list = []

Check failure on line 594 in src/tests/ftest/util/soak_test_base.py

View workflow job for this annotation

GitHub Actions / Flake8 check

F841 local variable 'job_script_list' is assigned to but never used
jobid_list = []
self.Job_List = []
# Update the remote log directories from new loop/pass
sharedsoaktest_dir = self.sharedsoak_dir + "/pass" + str(self.loop)
outputsoaktest_dir = self.outputsoak_dir + "/pass" + str(self.loop)
Expand All @@ -520,18 +613,15 @@ def execute_jobs(self, jobs, pools):
else:
self.soak_log_dir = sharedsoaktest_dir
# create the batch scripts
job_script_list = self.job_setup(jobs, pools)
# randomize job list
random.seed(4)
random.shuffle(job_script_list)
self.job_setup(jobs, pools)
# Gather the job_ids
job_id_list = self.job_startup(job_script_list)
jobid_list = self.job_startup()
# Initialize the failed_job_list to job_list so that any
# unexpected failures will clear the squeue in tearDown
self.failed_job_id_list = job_id_list
self.failed_job_id_list = jobid_list

# Wait for jobs to finish and cancel/kill jobs if necessary
self.failed_job_id_list = self.job_completion(job_id_list)
self.failed_job_id_list = self.job_completion(jobid_list)
# Log the failing job ID
if self.failed_job_id_list:
self.log.info(
Expand All @@ -550,6 +640,7 @@ def run_soak(self, test_param):
"""
self.soak_results = {}
self.Job_List = []
self.pool = []
self.container = []
self.harasser_results = {}
Expand Down
Loading

0 comments on commit 47e9f7c

Please sign in to comment.