From 47e9f7cdf5c60ede5b54360b87f4f44bf4eab92c Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Tue, 16 Jul 2024 10:26:22 -0400 Subject: [PATCH 01/25] DAOS-16167 test: update soak test to use internal job scheduler Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Required-githooks: true Signed-off-by: Maureen Jean --- src/tests/ftest/util/job_manager_utils.py | 1 + src/tests/ftest/util/soak_test_base.py | 225 ++++++++++++++------- src/tests/ftest/util/soak_utils.py | 232 ++++++++++++++++++---- 3 files changed, 348 insertions(+), 110 deletions(-) diff --git a/src/tests/ftest/util/job_manager_utils.py b/src/tests/ftest/util/job_manager_utils.py index 2b5f2cd6c26..afca5df86f1 100644 --- a/src/tests/ftest/util/job_manager_utils.py +++ b/src/tests/ftest/util/job_manager_utils.py @@ -473,6 +473,7 @@ def __init__(self, job, subprocess=False, mpi_type="openmpi"): self.tmpdir_base = FormattedParameter("--mca orte_tmpdir_base {}", None) self.args = BasicParameter(None, None) self.mpi_type = mpi_type + self.hostlist = FormattedParameter("-hosts {}", None) def assign_hosts(self, hosts, path=None, slots=None, hostfile=True): """Assign the hosts to use with the command (-f). diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index a97bb2094eb..b8d34c00695 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -26,9 +26,10 @@ create_app_cmdline, create_dm_cmdline, create_fio_cmdline, create_ior_cmdline, create_macsio_cmdline, create_mdtest_cmdline, create_racer_cmdline, ddhhmmss_format, get_daos_server_logs, get_harassers, - get_journalctl, launch_exclude_reintegrate, launch_extend, launch_reboot, - launch_server_stop_start, launch_snapshot, launch_vmd_identify_check, - reserved_file_copy, run_event_check, run_metrics_check, run_monitor_check) + get_id, get_journalctl, launch_exclude_reintegrate, launch_extend, + launch_jobscript, launch_reboot, launch_server_stop_start, launch_snapshot, + launch_vmd_identify_check, reserved_file_copy, run_event_check, + run_metrics_check, run_monitor_check) class SoakTestBase(TestWithServers): @@ -78,6 +79,8 @@ def __init__(self, *args, **kwargs): self.soak_log_dir = None self.soak_dir = None self.enable_scrubber = False + self.job_scheduler = None + self.Job_List = None def setUp(self): """Define test setup to be done.""" @@ -96,6 +99,7 @@ def setUp(self): self.sharedsoaktest_dir = self.sharedsoak_dir + "/pass" + str(self.loop) # Initialize dmg cmd self.dmg_command = self.get_dmg_command() + self.job_scheduler = self.params.get("job_scheduler", "/run/*", default="slurm") # Fail if slurm partition is not defined # NOTE: Slurm reservation and partition are created before soak runs. # CI uses partition=daos_client and no reservation. @@ -132,7 +136,7 @@ def pre_tear_down(self): self.log.info("<> at %s", time.ctime()) errors = [] # clear out any jobs in squeue; - if self.failed_job_id_list: + if self.failed_job_id_list and self.job_scheduler == "slurm": job_id = " ".join([str(job) for job in self.failed_job_id_list]) self.log.info("<>", job_id) cmd = "scancel --partition {} -u {} {}".format( @@ -296,6 +300,59 @@ def harasser_job_done(self, args): self.harasser_results[args["name"]] = args["status"] self.harasser_args[args["name"]] = args["vars"] + def schedule_jobs(self): + """Schedule jobs with internal scheduler.""" + self.log.debug("DBG: schedule_jobs ENTERED ") + job_queue = multiprocessing.Queue() + jobid_list = [] + node_list = self.hostlist_clients + for job_dict in self.Job_List: + jobid_list.append(job_dict["jobid"]) + self.log.info(f"Submitting {len(jobid_list)} jobs at {time.ctime()}") + while True: + if time.time() > self.end_time or len(jobid_list) == 0: + break + jobs = [] + job_results = {} + for job_dict in self.Job_List: + job_id = job_dict["jobid"] + if job_id in jobid_list: + node_count = job_dict["nodesperjob"] + if len(node_list) >= node_count: + self.log.debug(f"DBG: node_count {node_count}") + self.log.debug(f"DBG: node_list initial/queue {node_list}") + job_node_list = node_list[:node_count] + self.log.debug(f"DBG: node_list before launch_job {node_list}") + script = job_dict["jobscript"] + timeout = job_dict["jobtimeout"] + log = job_dict["joblog"] + error_log = job_dict["joberrlog"] + method = launch_jobscript + params = (self.log, job_queue, job_id, job_node_list, + script, log, error_log, timeout, self) + name = f"SOAK JOB {job_id}" + + jobs.append(threading.Thread(target=method, args=params, name=name)) + jobid_list.remove(job_id) + node_list = node_list[node_count:] + self.log.debug(f"DBG: node_list after launch_job {node_list}") + # run job scripts on all available nodes + for job in jobs: + job.start() + self.log.debug("DBG: all jobs started") + for job in jobs: + job.join() + self.log.debug("DBG: all jobs joined") + while not job_queue.empty(): + job_results = job_queue.get() + # Results to return in queue + node_list.update(job_results["host_list"]) + self.log.debug("DBG: Updating soak results") + self.soak_results[job_results["handle"]] = job_results["state"] + self.log.debug(f"DBG: node_list returned from queue {node_list}") + + self.log.debug("DBG: schedule_jobs EXITED ") + def job_setup(self, jobs, pool): """Create the cmdline needed to launch job. @@ -304,28 +361,24 @@ def job_setup(self, jobs, pool): pool (obj): TestPool obj Returns: - job_cmdlist: list of sbatch scripts that can be launched - by slurm job manager + job_cmdlist: list of dictionary of jobs that can be launched """ - job_cmdlist = [] self.log.info("<> at %s", self.test_name, time.ctime()) for job in jobs: - jobscript = [] + # list of all job scripts + jobscripts = [] + # command is a list of [sbatch_cmds, log_name] to create a single job script commands = [] - nodesperjob = self.params.get( - "nodesperjob", "/run/" + job + "/*", [1]) - taskspernode = self.params.get( - "taskspernode", "/run/" + job + "/*", [1]) + nodesperjob = self.params.get("nodesperjob", "/run/" + job + "/*", [1]) + taskspernode = self.params.get("taskspernode", "/run/" + job + "/*", [1]) for npj in list(nodesperjob): # nodesperjob = -1 indicates to use all nodes in client hostlist if npj < 0: npj = len(self.hostlist_clients) if len(self.hostlist_clients) / npj < 1: - raise SoakTestError( - "<> at %s", self.test_name, time.ctime()) job_id_list = [] - # before submitting the jobs to the queue, check the job timeout; + # before starting jobs, check the job timeout; if time.time() > self.end_time: self.log.info("<< SOAK test timeout in Job Startup>>") return job_id_list - # job_cmdlist is a list of batch script files - for script in job_cmdlist: - try: - job_id = slurm_utils.run_slurm_script(self.log, str(script)) - except slurm_utils.SlurmFailed as error: - self.log.error(error) - # Force the test to exit with failure - job_id = None - if job_id: - self.log.info( - "<> at %s", - job_id, script, time.ctime()) - slurm_utils.register_for_job_results(job_id, self, max_wait=self.test_timeout) - # keep a list of the job_id's - job_id_list.append(int(job_id)) - else: - # one of the jobs failed to queue; exit on first fail for now. - err_msg = f"Slurm failed to submit job for {script}" - job_id_list = [] - raise SoakTestError(f"<>") + if self.job_scheduler == "slurm": + for job_dict in self.Job_List: + script = job_dict["jobscript"] + try: + job_id = slurm_utils.run_slurm_script(self.log, str(script)) + except slurm_utils.SlurmFailed as error: + self.log.error(error) + # Force the test to exit with failure + job_id = None + if job_id: + self.log.info( + "<> at %s", job_id, script, time.ctime()) + slurm_utils.register_for_job_results(job_id, self, max_wait=self.test_timeout) + # Update Job_List with the job_id + job_dict["job_id"] = int(job_id) + job_id_list.append(int(job_id)) + else: + # one of the jobs failed to queue; exit on first fail for now. + err_msg = f"Job failed to run for {script}" + job_id_list = [] + raise SoakTestError(f"<>") + else: + for job_dict in self.Job_List: + job_dict["jobid"] = get_id() + job_id_list.append(job_dict["jobid"]) + + # self.schedule_jobs() + method = self.schedule_jobs + name = "Job Scheduler" + scheduler = threading.Thread(target=method, name=name) + # scheduler = multiprocessing.Process(target=method, name=name) + scheduler.start() + return job_id_list def job_completion(self, job_id_list): @@ -397,8 +474,7 @@ def job_completion(self, job_id_list): failed_job_id_list: IDs of each job that failed in slurm """ - self.log.info( - "<> at %s", self.test_name, time.ctime()) + self.log.info("<> at %s", self.test_name, time.ctime()) harasser_interval = 0 failed_harasser_msg = None harasser_timer = time.time() @@ -407,21 +483,27 @@ def job_completion(self, job_id_list): since = journalctl_time() # loop time exists after the first pass; no harassers in the first pass if self.harasser_loop_time and self.harassers: - harasser_interval = self.harasser_loop_time / ( - len(self.harassers) + 1) + harasser_interval = self.harasser_loop_time / (len(self.harassers) + 1) # If there is nothing to do; exit if job_id_list: # wait for all the jobs to finish while len(self.soak_results) < len(job_id_list): - # wait for the jobs to complete. - # enter tearDown before hitting the avocado timeout + self.log.debug(f"DBG: SOAK RESULTS 1 {self.soak_results}") + # wait for the jobs to complete unless test_timeout occurred if time.time() > self.end_time: - self.log.info( - "<< SOAK test timeout in Job Completion at %s >>", - time.ctime()) - for job in job_id_list: - if not slurm_utils.cancel_jobs(self.log, self.control, int(job)).passed: - self.fail(f"Error canceling Job {job}") + self.log.info("<< SOAK test timeout in Job Completion at %s >>", time.ctime()) + if self.job_scheduler == "slurm": + for job in job_id_list: + if not slurm_utils.cancel_jobs(self.log, self.control, int(job)).passed: + self.fail(f"Error canceling Job {job}") + else: + # update soak_results to include job id NOT run and set state = CANCELLED + for job in job_id_list: + if job not in list(self.soak_results.keys()): + self.soak_results.update({job: "CANCELLED"}) + self.log.info("FINAL STATE: soak job %s completed with : %s at %s", + job, "CANCELLED", time.ctime()) + break # monitor events every 15 min if datetime.now() > check_time: run_monitor_check(self) @@ -456,12 +538,12 @@ def job_completion(self, job_id_list): if failed_harasser_msg is not None: self.all_failed_harassers.append(failed_harasser_msg) # check for JobStatus = COMPLETED or CANCELLED (i.e. TEST TO) + self.log.debug(f"DBG: SOAK RESULTS 2 {self.soak_results}") for job, result in list(self.soak_results.items()): if result in ["COMPLETED", "CANCELLED"]: job_id_list.remove(int(job)) else: - self.log.info( - "<< Job %s failed with status %s>>", job, result) + self.log.info("<< Job %s failed with status %s>>", job, result) # gather all the logfiles for this pass and cleanup test nodes cmd = f"/usr/bin/rsync -avtr --min-size=1B {self.soak_log_dir} {self.outputsoak_dir}/" cmd2 = f"/usr/bin/rm -rf {self.soak_log_dir}" @@ -472,6 +554,15 @@ def job_completion(self, job_id_list): result = run_remote(self.log, self.hostlist_clients, cmd2, timeout=600) if not result.passed: self.log.error("Remote copy failed on %s", str(result.failed_hosts)) + # copy script files from shared dir + sharedscr_dir = self.sharedsoak_dir + "/pass" + str(self.loop) + cmd3 = f"/usr/bin/rsync -avtr --min-size=1B {sharedscr_dir} {self.outputsoak_dir}/" + cmd4 = f"/usr/bin/rm -rf {sharedscr_dir}" + try: + run_local(self.log, cmd3, timeout=600) + run_local(self.log, cmd4, timeout=600) + except RunException as error: + self.log.info("Script file copy failed with %s", error) # copy the local files; local host not included in hostlist_client try: run_local(self.log, cmd, timeout=600) @@ -501,6 +592,8 @@ def execute_jobs(self, jobs, pools): """ job_script_list = [] + jobid_list = [] + self.Job_List = [] # Update the remote log directories from new loop/pass sharedsoaktest_dir = self.sharedsoak_dir + "/pass" + str(self.loop) outputsoaktest_dir = self.outputsoak_dir + "/pass" + str(self.loop) @@ -520,18 +613,15 @@ def execute_jobs(self, jobs, pools): else: self.soak_log_dir = sharedsoaktest_dir # create the batch scripts - job_script_list = self.job_setup(jobs, pools) - # randomize job list - random.seed(4) - random.shuffle(job_script_list) + self.job_setup(jobs, pools) # Gather the job_ids - job_id_list = self.job_startup(job_script_list) + jobid_list = self.job_startup() # Initialize the failed_job_list to job_list so that any # unexpected failures will clear the squeue in tearDown - self.failed_job_id_list = job_id_list + self.failed_job_id_list = jobid_list # Wait for jobs to finish and cancel/kill jobs if necessary - self.failed_job_id_list = self.job_completion(job_id_list) + self.failed_job_id_list = self.job_completion(jobid_list) # Log the failing job ID if self.failed_job_id_list: self.log.info( @@ -550,6 +640,7 @@ def run_soak(self, test_param): """ self.soak_results = {} + self.Job_List = [] self.pool = [] self.container = [] self.harasser_results = {} diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 1f4aad41665..0653415b466 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -8,13 +8,14 @@ import os import random import re +import stat import threading import time -from itertools import product +from itertools import count, product -import slurm_utils from avocado.core.exceptions import TestFail from avocado.utils.distro import detect +from ClusterShell.NodeSet import NodeSet from command_utils_base import EnvironmentVariables from daos_racer_utils import DaosRacerCommand from data_mover_utils import DcpCommand, FsCopy @@ -36,6 +37,7 @@ from test_utils_container import add_container H_LOCK = threading.Lock() +id_counter = count(start=1) def ddhhmmss_format(seconds): @@ -56,6 +58,10 @@ def ddhhmmss_format(seconds): "%H:%M:%S", time.gmtime(seconds % 86400))) +def get_id(): + return next(id_counter) + + def add_pools(self, pool_names, ranks=None): """Create a list of pools that the various tests use for storage. @@ -337,6 +343,79 @@ def wait_for_pool_rebuild(self, pool, name): return rebuild_status +def launch_jobscript(log, job_queue, job_id, host_list, script, job_log, error_log, timeout, test): + """_summary_ + + Args: + log (_type_): _description_ + job_queue (_type_): _description_ + job_id (_type_): _description_ + host_list (_type_): _description_ + script (_type_): _description_ + job_log (_type_): _description_ + error_log (_type_): _description_ + timeout (_type_): _description_ + test (_type_): _description_ + """ + log.debug(f"DBG: JOB {job_id} ENTERED launch_jobscript") + job_results = [] + node_results = [] + state = "UNKNOWN" + if time.time() >= test.end_time: + results = {"handle": job_id, "state": "CANCELLED", "host_list": host_list} + log.debug(f"DBG: JOB {job_id} EXITED launch_jobscript") + job_queue.put(results) + # give time to update the queue before exiting + time.sleep(0.5) + return + if isinstance(host_list, str): + # assume one host in list + hosts = host_list + rhost = host_list + else: + hosts = ",".join(sorted(host_list)) + rhost = NodeSet(hosts)[0] + job_log1 = job_log.replace("JOBID", str(job_id)) + error_log1 = error_log.replace("JOBID", str(job_id)) + joblog = job_log1.replace("RHOST", str(rhost)) + errorlog = error_log1.replace("RHOST", str(rhost)) + cmd = f"{script} {hosts} {job_id} > {joblog} 2> {errorlog}" + job_results = run_remote( + log, rhost, cmd, verbose=True, timeout=timeout * 60, task_debug=True, stderr=True) + if job_results: + if job_results.timeout: + state = "TIMEOUT" + elif job_results.passed: + state = "COMPLETED" + elif not job_results.passed: + state = "FAILED" + else: + state = "UNKNOWN" + if time.time() >= test.end_time: + results = {"handle": job_id, "state": "CANCELLED", "host_list": host_list} + log.debug(f"DBG: JOB {job_id} EXITED launch_jobscript") + job_queue.put(results) + # give time to update the queue before exiting + time.sleep(0.5) + return + + # check if all nodes are available + cmd = "hostname -s" + node_results = run_remote(log, NodeSet(hosts), cmd, verbose=False) + if node_results.failed_hosts: + for node in node_results.failed_hosts: + host_list.remove(node) + log.debug("DBG: Node {node} is marked as DOWN in job {job_id}") + + log.info("FINAL STATE: soak job %s completed with : %s at %s", job_id, state, time.ctime()) + results = {"handle": job_id, "state": state, "host_list": host_list} + log.debug(f"DBG: JOB {job_id} EXITED launch_jobscript") + job_queue.put(results) + # give time to update the queue before exiting + time.sleep(0.5) + return + + def launch_snapshot(self, pool, name): """Create a basic snapshot of the reserved pool. @@ -858,16 +937,16 @@ def start_dfuse(self, pool, container, name=None, job_spec=None): dfuselog = os.path.join( self.soak_log_dir, self.test_name + "_" + name + "_`hostname -s`_" - "" + "${SLURM_JOB_ID}_" + "daos_dfuse.log") + "" + "${JOB_ID}_" + "daos_dfuse.log") dfuse_env = f"export D_LOG_FILE_APPEND_PID=1;export D_LOG_MASK=ERR;export D_LOG_FILE={dfuselog}" module_load = f"module use {self.mpi_module_use};module load {self.mpi_module}" dfuse_start_cmds = [ - "clush -S -w $SLURM_JOB_NODELIST \"mkdir -p {}\"".format(dfuse.mount_dir.value), - "clush -S -w $SLURM_JOB_NODELIST \"cd {};{};{};{}\"".format( + "clush -S -w $HOSTLIST \"mkdir -p {}\"".format(dfuse.mount_dir.value), + "clush -S -w $HOSTLIST \"cd {};{};{};{}\"".format( dfuse.mount_dir.value, dfuse_env, module_load, str(dfuse)), "sleep 10", - "clush -S -w $SLURM_JOB_NODELIST \"df -h {}\"".format(dfuse.mount_dir.value), + "clush -S -w $HOSTLIST \"df -h {}\"".format(dfuse.mount_dir.value), ] return dfuse, dfuse_start_cmds @@ -889,8 +968,8 @@ def stop_dfuse(dfuse, vol=False): dfuse.mount_dir.value)]) dfuse_stop_cmds.extend([ - "clush -S -w $SLURM_JOB_NODELIST \"fusermount3 -uz {0}\"".format(dfuse.mount_dir.value), - "clush -S -w $SLURM_JOB_NODELIST \"rm -rf {0}\"".format(dfuse.mount_dir.value)]) + "clush -S -w $HOSTLIST \"fusermount3 -uz {0}\"".format(dfuse.mount_dir.value), + "clush -S -w $HOSTLIST \"rm -rf {0}\"".format(dfuse.mount_dir.value)]) return dfuse_stop_cmds @@ -986,7 +1065,7 @@ def create_ior_cmdline(self, job_spec, pool, ppn, nodesperjob, oclass_list=None, file_dir_oclass[0], nodesperjob * ppn, nodesperjob, ppn) daos_log = os.path.join( self.soak_log_dir, self.test_name + "_" + log_name - + "_`hostname -s`_${SLURM_JOB_ID}_daos.log") + + "_`hostname -s`_${JOB_ID}_daos.log") env = ior_cmd.get_default_env("mpirun", log_file=daos_log) env["D_LOG_FILE_APPEND_PID"] = "1" sbatch_cmds = [f"module use {self.mpi_module_use}", f"module load {self.mpi_module}"] @@ -1013,6 +1092,7 @@ def create_ior_cmdline(self, job_spec, pool, ppn, nodesperjob, oclass_list=None, mpirun_cmd.assign_processes(nodesperjob * ppn) mpirun_cmd.assign_environment(env, True) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") sbatch_cmds.append(str(mpirun_cmd)) sbatch_cmds.append("status=$?") if api in ["HDF5-VOL", "POSIX", "POSIX-LIBPIL4DFS", "POSIX-LIBIOIL"]: @@ -1058,13 +1138,13 @@ def create_macsio_cmdline(self, job_spec, pool, ppn, nodesperjob): job_spec, api, file_oclass, nodesperjob * ppn, nodesperjob, ppn) daos_log = os.path.join( self.soak_log_dir, self.test_name - + "_" + log_name + "_`hostname -s`_${SLURM_JOB_ID}_daos.log") + + "_" + log_name + "_`hostname -s`_${JOB_ID}_daos.log") macsio_log = os.path.join( self.soak_log_dir, self.test_name - + "_" + log_name + "_`hostname -s`_${SLURM_JOB_ID}_macsio-log.log") + + "_" + log_name + "_`hostname -s`_${JOB_ID}_macsio-log.log") macsio_timing_log = os.path.join( self.soak_log_dir, self.test_name - + "_" + log_name + "_`hostname -s`_${SLURM_JOB_ID}_macsio-timing.log") + + "_" + log_name + "_`hostname -s`_${JOB_ID}_macsio-timing.log") macsio.log_file_name.update(macsio_log) macsio.timings_file_name.update(macsio_timing_log) env = macsio.env.copy() @@ -1086,6 +1166,7 @@ def create_macsio_cmdline(self, job_spec, pool, ppn, nodesperjob): mpirun_cmd.working_dir.update(dfuse.mount_dir.value) mpirun_cmd.assign_environment(env, True) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") sbatch_cmds.append(str(mpirun_cmd)) sbatch_cmds.append("status=$?") if api in ["HDF5-VOL"]: @@ -1157,7 +1238,7 @@ def create_mdtest_cmdline(self, job_spec, pool, ppn, nodesperjob): ppn) daos_log = os.path.join( self.soak_log_dir, self.test_name + "_" + log_name - + "_`hostname -s`_${SLURM_JOB_ID}_daos.log") + + "_`hostname -s`_${JOB_ID}_daos.log") env = mdtest_cmd.get_default_env("mpirun", log_file=daos_log) env["D_LOG_FILE_APPEND_PID"] = "1" sbatch_cmds = [f"module use {self.mpi_module_use}", f"module load {self.mpi_module}"] @@ -1179,6 +1260,7 @@ def create_mdtest_cmdline(self, job_spec, pool, ppn, nodesperjob): mpirun_cmd.assign_processes(nodesperjob * ppn) mpirun_cmd.assign_environment(env, True) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") sbatch_cmds.append(str(mpirun_cmd)) sbatch_cmds.append("status=$?") if api in ["POSIX", "POSIX-LIBPIL4DFS", "POSIX-LIBIOIL"]: @@ -1214,7 +1296,7 @@ def create_racer_cmdline(self, job_spec): racer_log = os.path.join( self.soak_log_dir, self.test_name + "_" + job_spec + "_`hostname -s`_" - "${SLURM_JOB_ID}_" + "racer_log") + "${JOB_ID}_" + "racer_log") daos_racer.env["D_LOG_FILE"] = get_log_file(racer_log) log_name = job_spec cmds = [] @@ -1383,6 +1465,7 @@ def create_app_cmdline(self, job_spec, pool, ppn, nodesperjob): mpirun_cmd.assign_environment(env, True) mpirun_cmd.assign_processes(nodesperjob * ppn) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") if api in ["POSIX", "POSIX-LIBIOIL", "POSIX-LIBPIL4DFS"]: mpirun_cmd.working_dir.update(dfuse.mount_dir.value) cmdline = str(mpirun_cmd) @@ -1433,7 +1516,7 @@ def create_dm_cmdline(self, job_spec, pool, ppn, nodesperjob): dcp_cmd.set_params(src=src_file, dst=dst_file) env_vars = { "D_LOG_FILE": os.path.join(self.soak_log_dir, self.test_name + "_" - + log_name + "_`hostname -s`_${SLURM_JOB_ID}_daos.log"), + + log_name + "_`hostname -s`_${JOB_ID}_daos.log"), "D_LOG_FILE_APPEND_PID": "1" } mpirun_cmd = Mpirun(dcp_cmd, mpi_type=self.mpi_module) @@ -1441,6 +1524,7 @@ def create_dm_cmdline(self, job_spec, pool, ppn, nodesperjob): mpirun_cmd.assign_processes(nodesperjob * ppn) mpirun_cmd.assign_environment(EnvironmentVariables(env_vars), True) mpirun_cmd.ppn.update(ppn) + mpirun_cmd.hostlist.update("$HOSTLIST") sbatch_cmds.append(str(mpirun_cmd)) sbatch_cmds.append("status=$?") @@ -1456,52 +1540,114 @@ def create_dm_cmdline(self, job_spec, pool, ppn, nodesperjob): def build_job_script(self, commands, job, nodesperjob, ppn): - """Create a slurm batch script that will execute a list of cmdlines. + """Generate a script that will execute a list of commands. Args: - self (obj): soak obj - commands(list): command lines and cmd specific log_name - job(str): the job name that will be defined in the slurm script + path (str): where to write the script file + name (str): job name + output (str): where to put the output (full path) + nodecount (int): number of compute nodes to execute on + cmds (list): shell commands that are to be executed + uniq (str): a unique string to append to the job and log files + sbatch_params (dict, optional): dictionary containing other less often used parameters to + sbatch, e.g. mem:100. Defaults to None. + + Raises: + SoakTestError: if missing require parameters for the job script Returns: - script_list: list of slurm batch scripts + str: the full path of the script """ - job_timeout = self.params.get("job_timeout", "/run/" + job + "/*", 10) - self.log.info("<> at %s", time.ctime()) + self.log.info("<> at %s", time.ctime()) script_list = [] - # if additional cmds are needed in the batch script + # Additional commands needed in the job script prepend_cmds = ["set +e", "echo Job_Start_Time `date \\+\"%Y-%m-%d %T\"`", "daos pool query {} ".format(self.pool[1].identifier), "daos pool query {} ".format(self.pool[0].identifier)] + append_cmds = ["daos pool query {} ".format(self.pool[1].identifier), "daos pool query {} ".format(self.pool[0].identifier), "echo Job_End_Time `date \\+\"%Y-%m-%d %T\"`"] exit_cmd = ["exit $status"] - # Create the sbatch script for each list of cmdlines + for cmd, log_name in commands: - if isinstance(cmd, str): - cmd = [cmd] - output = os.path.join( - self.soak_log_dir, self.test_name + "_" + log_name + "_%N_" + "%j_") - error = os.path.join(str(output) + "ERROR_") - sbatch = { - "time": str(job_timeout) + ":00", - "exclude": str(self.slurm_exclude_nodes), - "error": str(error), - "export": "ALL", - "exclusive": None, - "ntasks": str(nodesperjob * ppn) - } - # include the cluster specific params - sbatch.update(self.srun_params) unique = get_random_string(5, self.used) - script = slurm_utils.write_slurm_script( - self.soak_log_dir, job, output, nodesperjob, - prepend_cmds + cmd + append_cmds + exit_cmd, unique, sbatch) - script_list.append(script) self.used.append(unique) + if isinstance(cmd, str): + cmd = [cmd] + if self.job_scheduler == "slurm": + job_timeout = self.params.get("job_timeout", "/run/" + job + "/*", 10) + job_log = os.path.join( + self.soak_log_dir, self.test_name + "_" + log_name + "_%N_" + "%j_") + output = job_log + unique + error = job_log + "ERROR_" + unique + sbatch_params = { + "time": str(job_timeout) + ":00", + "exclude": str(self.slurm_exclude_nodes), + "error": str(error), + "export": "ALL", + "exclusive": None, + "ntasks": str(nodesperjob * ppn) + } + # include the cluster specific params + sbatch_params.update(self.srun_params) + else: + job_log = os.path.join( + self.soak_log_dir, self.test_name + "_" + log_name + "_RHOST" + "_JOBID_") + output = job_log + unique + error = job_log + "ERROR_" + unique + + job_cmds = prepend_cmds + cmd + append_cmds + exit_cmd + # Write script file to shared dir + sharedscript_dir = self.sharedsoak_dir + "/pass" + str(self.loop) + scriptfile = sharedscript_dir + '/jobscript' + "_" + str(unique) + ".sh" + with open(scriptfile, 'w') as script_file: + script_file.write("#!/bin/bash\n#\n") + if self.job_scheduler == "slurm": + # write the slurm directives in the job script + script_file.write("#SBATCH --job-name={}\n".format(job)) + script_file.write("#SBATCH --nodes={}\n".format(nodesperjob)) + script_file.write("#SBATCH --distribution=cyclic\n") + script_file.write("#SBATCH --output={}\n".format(output)) + if sbatch_params: + for key, value in list(sbatch_params.items()): + if value is not None: + if key == "error": + value = value + script_file.write("#SBATCH --{}={}\n".format(key, value)) + else: + script_file.write("#SBATCH --{}\n".format(key)) + script_file.write("\n") + script_file.write("if [ -z \"$VIRTUAL_ENV\" ]; then \n") + script_file.write(" echo \"VIRTUAL_ENV not defined\" \n") + script_file.write("else \n") + script_file.write(" source $VIRTUAL_ENV/bin/activate \n") + script_file.write("fi \n") + script_file.write("HOSTLIST=`nodeset -e -S \",\" $SLURM_JOB_NODELIST` \n") + script_file.write("JOB_ID=$SLURM_JOB_ID \n") + script_file.write("echo \"SLURM NODES: \" $SLURM_JOB_NODELIST \n") + script_file.write("echo \"NODE COUNT: \" $SLURM_JOB_NUM_NODES \n") + script_file.write("echo \"JOB ID: \" $JOB_ID \n") + script_file.write("echo \"HOSTLIST: \" $HOSTLIST \n") + script_file.write("\n") + else: + script_file.write("HOSTLIST=$1 \n") + script_file.write("JOB_ID=$2 \n") + script_file.write("echo \"JOB NODES: \" $HOSTLIST \n") + script_file.write("echo \"JOB ID: \" $JOB_ID \n") + script_file.write("if [ -z \"$VIRTUAL_ENV\" ]; then \n") + script_file.write(" echo \"VIRTUAL_ENV not defined\" \n") + script_file.write("else \n") + script_file.write(" source $VIRTUAL_ENV/bin/activate \n") + script_file.write("fi \n") + + for cmd in list(job_cmds): + script_file.write(cmd + "\n") + script_file.close() + os.chmod(scriptfile, stat.S_IXUSR | stat.S_IRUSR) + script_list.append([scriptfile, output, error]) return script_list From feac076368c2d2d848b73c4e81c6b71c3501b5a6 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Tue, 23 Jul 2024 19:41:51 -0400 Subject: [PATCH 02/25] missing commit Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Required-githooks: true Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 124 ++++++++++++------------- src/tests/ftest/util/soak_utils.py | 81 ++++++++++++---- 2 files changed, 121 insertions(+), 84 deletions(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index b8d34c00695..8f479ccfe50 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -25,11 +25,13 @@ from soak_utils import (SoakTestError, add_pools, build_job_script, cleanup_dfuse, create_app_cmdline, create_dm_cmdline, create_fio_cmdline, create_ior_cmdline, create_macsio_cmdline, create_mdtest_cmdline, - create_racer_cmdline, ddhhmmss_format, get_daos_server_logs, get_harassers, - get_id, get_journalctl, launch_exclude_reintegrate, launch_extend, - launch_jobscript, launch_reboot, launch_server_stop_start, launch_snapshot, - launch_vmd_identify_check, reserved_file_copy, run_event_check, - run_metrics_check, run_monitor_check) + create_racer_cmdline, ddhhmmss_format, debug_logging, get_daos_server_logs, + get_harassers, get_id, get_job_logs, get_journalctl, + launch_exclude_reintegrate, launch_extend, launch_jobscript, launch_reboot, + launch_server_stop_start, launch_snapshot, launch_vmd_identify_check, + reserved_file_copy, run_event_check, run_metrics_check, run_monitor_check) + +J_LOCK = threading.Lock() class SoakTestBase(TestWithServers): @@ -81,6 +83,7 @@ def __init__(self, *args, **kwargs): self.enable_scrubber = False self.job_scheduler = None self.Job_List = None + self.enable_debug_msg = False def setUp(self): """Define test setup to be done.""" @@ -100,30 +103,28 @@ def setUp(self): # Initialize dmg cmd self.dmg_command = self.get_dmg_command() self.job_scheduler = self.params.get("job_scheduler", "/run/*", default="slurm") - # Fail if slurm partition is not defined - # NOTE: Slurm reservation and partition are created before soak runs. - # CI uses partition=daos_client and no reservation. - # A21 uses partition=normal/default and reservation=daos-test. - # Partition and reservation names are updated in the yaml file. - # It is assumed that if there is no reservation (CI only), then all - # the nodes in the partition will be used for soak. - if not self.host_info.clients.partition.name: - raise SoakTestError( - "<>") - self.srun_params = {"partition": self.host_info.clients.partition.name} - if self.host_info.clients.partition.reservation: - self.srun_params["reservation"] = self.host_info.clients.partition.reservation - # Include test node for log cleanup; remove from client list + # soak jobs do not run on the local node local_host_list = include_local_host(None) - self.slurm_exclude_nodes.add(local_host_list) if local_host_list[0] in self.hostlist_clients: self.hostlist_clients.remove((local_host_list[0])) if not self.hostlist_clients: - self.fail( - "There are no valid nodes in this partition to run " - "soak. Check partition {} for valid nodes".format( - self.host_info.clients.partition.name)) + self.fail("There are no valid nodes to run soak") + if self.job_scheduler == "slurm": + # Fail if slurm partition is not defined + # NOTE: Slurm reservation and partition are created before soak runs. + # CI uses partition=daos_client and no reservation. + # A21 uses partition=normal/default and reservation=daos-test. + # Partition and reservation names are updated in the yaml file. + # It is assumed that if there is no reservation (CI only), then all + # the nodes in the partition will be used for soak. + if not self.host_info.clients.partition.name: + raise SoakTestError( + "<>") + self.srun_params = {"partition": self.host_info.clients.partition.name} + if self.host_info.clients.partition.reservation: + self.srun_params["reservation"] = self.host_info.clients.partition.reservation + # Include test node for log cleanup; remove from client list + self.slurm_exclude_nodes.add(local_host_list) def pre_tear_down(self): """Tear down any test-specific steps prior to running tearDown(). @@ -169,7 +170,8 @@ def pre_tear_down(self): # display final metrics run_metrics_check(self, prefix="final") - # Gather server logs + # Gather logs + get_job_logs(self) try: get_daos_server_logs(self) except SoakTestError as error: @@ -302,7 +304,7 @@ def harasser_job_done(self, args): def schedule_jobs(self): """Schedule jobs with internal scheduler.""" - self.log.debug("DBG: schedule_jobs ENTERED ") + debug_logging(self.log, self.enable_debug_msg, "DBG: schedule_jobs ENTERED ") job_queue = multiprocessing.Queue() jobid_list = [] node_list = self.hostlist_clients @@ -319,10 +321,17 @@ def schedule_jobs(self): if job_id in jobid_list: node_count = job_dict["nodesperjob"] if len(node_list) >= node_count: - self.log.debug(f"DBG: node_count {node_count}") - self.log.debug(f"DBG: node_list initial/queue {node_list}") + debug_logging( + self.log, self.enable_debug_msg, f"DBG: node_count {node_count}") + debug_logging( + self.log, + self.enable_debug_msg, + f"DBG: node_list initial/queue {node_list}") job_node_list = node_list[:node_count] - self.log.debug(f"DBG: node_list before launch_job {node_list}") + debug_logging( + self.log, + self.enable_debug_msg, + f"DBG: node_list before launch_job {node_list}") script = job_dict["jobscript"] timeout = job_dict["jobtimeout"] log = job_dict["joblog"] @@ -335,23 +344,28 @@ def schedule_jobs(self): jobs.append(threading.Thread(target=method, args=params, name=name)) jobid_list.remove(job_id) node_list = node_list[node_count:] - self.log.debug(f"DBG: node_list after launch_job {node_list}") + debug_logging( + self.log, + self.enable_debug_msg, + f"DBG: node_list after launch_job {node_list}") # run job scripts on all available nodes for job in jobs: job.start() - self.log.debug("DBG: all jobs started") + debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs started") for job in jobs: job.join() - self.log.debug("DBG: all jobs joined") + debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs joined") while not job_queue.empty(): job_results = job_queue.get() # Results to return in queue node_list.update(job_results["host_list"]) - self.log.debug("DBG: Updating soak results") + debug_logging(self.log, self.enable_debug_msg, "DBG: Updating soak results") self.soak_results[job_results["handle"]] = job_results["state"] - self.log.debug(f"DBG: node_list returned from queue {node_list}") - - self.log.debug("DBG: schedule_jobs EXITED ") + debug_logging( + self.log, + self.enable_debug_msg, + f"DBG: node_list returned from queue {node_list}") + debug_logging(self.log, self.enable_debug_msg, "DBG: schedule_jobs EXITED ") def job_setup(self, jobs, pool): """Create the cmdline needed to launch job. @@ -450,7 +464,7 @@ def job_startup(self): # one of the jobs failed to queue; exit on first fail for now. err_msg = f"Job failed to run for {script}" job_id_list = [] - raise SoakTestError(f"<>") + raise SoakTestError(f"<>") else: for job_dict in self.Job_List: job_dict["jobid"] = get_id() @@ -488,7 +502,8 @@ def job_completion(self, job_id_list): if job_id_list: # wait for all the jobs to finish while len(self.soak_results) < len(job_id_list): - self.log.debug(f"DBG: SOAK RESULTS 1 {self.soak_results}") + debug_logging( + self.log, self.enable_debug_msg, f"DBG: SOAK RESULTS 1 {self.soak_results}") # wait for the jobs to complete unless test_timeout occurred if time.time() > self.end_time: self.log.info("<< SOAK test timeout in Job Completion at %s >>", time.ctime()) @@ -538,37 +553,14 @@ def job_completion(self, job_id_list): if failed_harasser_msg is not None: self.all_failed_harassers.append(failed_harasser_msg) # check for JobStatus = COMPLETED or CANCELLED (i.e. TEST TO) - self.log.debug(f"DBG: SOAK RESULTS 2 {self.soak_results}") + debug_logging( + self.log, self.enable_debug_msg, f"DBG: SOAK RESULTS 2 {self.soak_results}") for job, result in list(self.soak_results.items()): if result in ["COMPLETED", "CANCELLED"]: job_id_list.remove(int(job)) else: self.log.info("<< Job %s failed with status %s>>", job, result) - # gather all the logfiles for this pass and cleanup test nodes - cmd = f"/usr/bin/rsync -avtr --min-size=1B {self.soak_log_dir} {self.outputsoak_dir}/" - cmd2 = f"/usr/bin/rm -rf {self.soak_log_dir}" - if self.enable_remote_logging: - # Limit fan out to reduce burden on filesystem - result = run_remote(self.log, self.hostlist_clients, cmd, timeout=600, fanout=64) - if result.passed: - result = run_remote(self.log, self.hostlist_clients, cmd2, timeout=600) - if not result.passed: - self.log.error("Remote copy failed on %s", str(result.failed_hosts)) - # copy script files from shared dir - sharedscr_dir = self.sharedsoak_dir + "/pass" + str(self.loop) - cmd3 = f"/usr/bin/rsync -avtr --min-size=1B {sharedscr_dir} {self.outputsoak_dir}/" - cmd4 = f"/usr/bin/rm -rf {sharedscr_dir}" - try: - run_local(self.log, cmd3, timeout=600) - run_local(self.log, cmd4, timeout=600) - except RunException as error: - self.log.info("Script file copy failed with %s", error) - # copy the local files; local host not included in hostlist_client - try: - run_local(self.log, cmd, timeout=600) - run_local(self.log, cmd2, timeout=600) - except RunException as error: - self.log.info("Local copy failed with %s", error) + get_job_logs(self) self.soak_results = {} return job_id_list @@ -591,7 +583,6 @@ def execute_jobs(self, jobs, pools): SoakTestError """ - job_script_list = [] jobid_list = [] self.Job_List = [] # Update the remote log directories from new loop/pass @@ -651,6 +642,7 @@ def run_soak(self, test_param): self.soak_errors = [] self.check_errors = [] self.used = [] + self.enable_debug_msg = self.params.get("enable_debug_msg", "/run/*", default=False) self.mpi_module = self.params.get("mpi_module", "/run/*", default="mpi/mpich-x86_64") self.mpi_module_use = self.params.get( "mpi_module_use", "/run/*", default="/usr/share/modulefiles") diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 0653415b466..3641b3a42fb 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -33,7 +33,7 @@ from mdtest_utils import MdtestCommand from oclass_utils import extract_redundancy_factor from pydaos.raw import DaosApiError, DaosSnapshot -from run_utils import run_remote +from run_utils import RunException, run_local, run_remote from test_utils_container import add_container H_LOCK = threading.Lock() @@ -62,6 +62,18 @@ def get_id(): return next(id_counter) +def debug_logging(log, enable_debug_msg, log_msg): + """_summary_ + + Args: + log (_type_): _description_ + enable_debug_msg (_type_): _description_ + log_msg (_type_): _description_ + """ + if enable_debug_msg: + log.debug(log_msg) + + def add_pools(self, pool_names, ranks=None): """Create a list of pools that the various tests use for storage. @@ -250,6 +262,36 @@ def get_daos_server_logs(self): raise SoakTestError(f"<>") from error +def get_job_logs(self): + """Gather all job logs for the current pass of soak.""" + + # gather all the logfiles for this pass and cleanup client nodes + cmd = f"/usr/bin/rsync -avtr --min-size=1B {self.soak_log_dir} {self.outputsoak_dir}/" + cmd2 = f"/usr/bin/rm -rf {self.soak_log_dir}" + if self.enable_remote_logging: + # Limit fan out to reduce burden on filesystem + result = run_remote(self.log, self.hostlist_clients, cmd, timeout=600, fanout=64) + if result.passed: + result = run_remote(self.log, self.hostlist_clients, cmd2, timeout=600) + if not result.passed: + self.log.error("Remote copy failed on %s", str(result.failed_hosts)) + # copy script files from shared dir + sharedscr_dir = self.sharedsoak_dir + "/pass" + str(self.loop) + cmd3 = f"/usr/bin/rsync -avtr --min-size=1B {sharedscr_dir} {self.outputsoak_dir}/" + cmd4 = f"/usr/bin/rm -rf {sharedscr_dir}" + try: + run_local(self.log, cmd3, timeout=600) + run_local(self.log, cmd4, timeout=600) + except RunException as error: + self.log.info("Script file copy failed with %s", error) + # copy the local files; local host not included in hostlist_client + try: + run_local(self.log, cmd, timeout=600) + run_local(self.log, cmd2, timeout=600) + except RunException as error: + self.log.info("Local copy failed with %s", error) + + def run_monitor_check(self): """Monitor server cpu, memory usage periodically. @@ -334,11 +376,11 @@ def wait_for_pool_rebuild(self, pool, name): pool.wait_for_rebuild_to_end() rebuild_status = True except DaosTestError as error: - self.log.error(f"<<= test.end_time: results = {"handle": job_id, "state": "CANCELLED", "host_list": host_list} - log.debug(f"DBG: JOB {job_id} EXITED launch_jobscript") + debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript") job_queue.put(results) # give time to update the queue before exiting time.sleep(0.5) @@ -381,7 +423,7 @@ def launch_jobscript(log, job_queue, job_id, host_list, script, job_log, error_l errorlog = error_log1.replace("RHOST", str(rhost)) cmd = f"{script} {hosts} {job_id} > {joblog} 2> {errorlog}" job_results = run_remote( - log, rhost, cmd, verbose=True, timeout=timeout * 60, task_debug=True, stderr=True) + log, rhost, cmd, verbose=False, timeout=timeout * 60, task_debug=False, stderr=False) if job_results: if job_results.timeout: state = "TIMEOUT" @@ -393,7 +435,7 @@ def launch_jobscript(log, job_queue, job_id, host_list, script, job_log, error_l state = "UNKNOWN" if time.time() >= test.end_time: results = {"handle": job_id, "state": "CANCELLED", "host_list": host_list} - log.debug(f"DBG: JOB {job_id} EXITED launch_jobscript") + debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript") job_queue.put(results) # give time to update the queue before exiting time.sleep(0.5) @@ -405,11 +447,12 @@ def launch_jobscript(log, job_queue, job_id, host_list, script, job_log, error_l if node_results.failed_hosts: for node in node_results.failed_hosts: host_list.remove(node) - log.debug("DBG: Node {node} is marked as DOWN in job {job_id}") + debug_logging( + log, test.enable_debug_msg, "DBG: Node {node} is marked as DOWN in job {job_id}") log.info("FINAL STATE: soak job %s completed with : %s at %s", job_id, state, time.ctime()) results = {"handle": job_id, "state": state, "host_list": host_list} - log.debug(f"DBG: JOB {job_id} EXITED launch_jobscript") + debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript") job_queue.put(results) # give time to update the queue before exiting time.sleep(0.5) @@ -938,8 +981,10 @@ def start_dfuse(self, pool, container, name=None, job_spec=None): self.soak_log_dir, self.test_name + "_" + name + "_`hostname -s`_" "" + "${JOB_ID}_" + "daos_dfuse.log") - dfuse_env = f"export D_LOG_FILE_APPEND_PID=1;export D_LOG_MASK=ERR;export D_LOG_FILE={dfuselog}" - module_load = f"module use {self.mpi_module_use};module load {self.mpi_module}" + dfuse_env = ";".join(["export D_LOG_FILE_APPEND_PID=1", + "export D_LOG_MASK=ERR", + f"export D_LOG_FILE={dfuselog}"]) + module_load = ";".join([f"module use {self.mpi_module_use}", f"module load {self.mpi_module}"]) dfuse_start_cmds = [ "clush -S -w $HOSTLIST \"mkdir -p {}\"".format(dfuse.mount_dir.value), @@ -1098,7 +1143,7 @@ def create_ior_cmdline(self, job_spec, pool, ppn, nodesperjob, oclass_list=None, if api in ["HDF5-VOL", "POSIX", "POSIX-LIBPIL4DFS", "POSIX-LIBIOIL"]: sbatch_cmds.extend(stop_dfuse(dfuse, vol)) commands.append([sbatch_cmds, log_name]) - self.log.info(f"<>:") + self.log.info(f"<>: ") for cmd in sbatch_cmds: self.log.info(cmd) return commands @@ -1172,7 +1217,7 @@ def create_macsio_cmdline(self, job_spec, pool, ppn, nodesperjob): if api in ["HDF5-VOL"]: sbatch_cmds.extend(stop_dfuse(dfuse, vol=True)) commands.append([sbatch_cmds, log_name]) - self.log.info("<>:") + self.log.info("<>: ") for cmd in sbatch_cmds: self.log.info(cmd) return commands @@ -1266,7 +1311,7 @@ def create_mdtest_cmdline(self, job_spec, pool, ppn, nodesperjob): if api in ["POSIX", "POSIX-LIBPIL4DFS", "POSIX-LIBIOIL"]: sbatch_cmds.extend(stop_dfuse(dfuse)) commands.append([sbatch_cmds, log_name]) - self.log.info(f"<>:") + self.log.info(f"<>: ") for cmd in sbatch_cmds: self.log.info(cmd) return commands @@ -1304,7 +1349,7 @@ def create_racer_cmdline(self, job_spec): cmds.append("status=$?") # add exit code commands.append([cmds, log_name]) - self.log.info("<>:") + self.log.info("<>: ") for cmd in cmds: self.log.info(cmd) return commands @@ -1392,7 +1437,7 @@ def create_fio_cmdline(self, job_spec, pool): cmds.append("cd -") cmds.extend(stop_dfuse(dfuse)) commands.append([cmds, log_name]) - self.log.info("<>:") + self.log.info("<>: ") for cmd in cmds: self.log.info(cmd) return commands @@ -1474,7 +1519,7 @@ def create_app_cmdline(self, job_spec, pool, ppn, nodesperjob): if api in ["POSIX", "POSIX-LIBIOIL", "POSIX-LIBPIL4DFS"]: sbatch_cmds.extend(stop_dfuse(dfuse)) commands.append([sbatch_cmds, log_name]) - self.log.info(f"<<{job_spec.upper()} cmdlines>>:") + self.log.info(f"<<{job_spec.upper()} cmdlines>>: ") for cmd in sbatch_cmds: self.log.info("%s", cmd) if mpi_module != self.mpi_module: @@ -1532,7 +1577,7 @@ def create_dm_cmdline(self, job_spec, pool, ppn, nodesperjob): dm_commands = create_ior_cmdline( self, ior_spec, pool, ppn, nodesperjob, [[file_oclass, dir_oclass]], cont_2) sbatch_cmds.extend(dm_commands[0][0]) - self.log.info("<>:") + self.log.info("<>: ") for cmd in sbatch_cmds: self.log.info("%s", cmd) commands.append([sbatch_cmds, log_name]) From d72ff7c64beb65f7a9d604634afc230a905bf300 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Tue, 23 Jul 2024 20:07:12 -0400 Subject: [PATCH 03/25] fix base dir Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Required-githooks: true Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 3 ++- src/tests/ftest/util/soak_utils.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 8f479ccfe50..c4eaf636d7d 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -95,7 +95,8 @@ def setUp(self): # Setup logging directories for soak logfiles # self.output dir is an avocado directory .../data/ self.outputsoak_dir = self.outputdir + "/soak" - self.soak_dir = self.base_test_dir + "/soak" + base_test_dir = os.getenv("DAOS_TEST_LOG_DIR", "/tmp/") + self.soak_dir = base_test_dir + "/soak" self.soaktest_dir = self.soak_dir + "/pass" + str(self.loop) # Create the a shared directory for logs self.sharedsoak_dir = self.tmp + "/soak" diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 3641b3a42fb..20ea3528016 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -251,7 +251,7 @@ def get_daos_server_logs(self): self (obj): soak obj """ daos_dir = self.outputsoak_dir + "/daos_server_logs" - logs_dir = os.environ.get("DAOS_TEST_LOG_DIR", "/var/tmp/daos_testing/") + logs_dir = os.environ.get("DAOS_TEST_LOG_DIR", "/tmp/") hosts = self.hostlist_servers if not os.path.exists(daos_dir): os.mkdir(daos_dir) From 392f60e4d0d27de8a2ce4355b9cf1d9c4c15de51 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Wed, 24 Jul 2024 08:03:08 -0400 Subject: [PATCH 04/25] add env to scripts Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Required-githooks: true Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 6 +++++- src/tests/ftest/util/soak_utils.py | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index c4eaf636d7d..23100916502 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -309,6 +309,10 @@ def schedule_jobs(self): job_queue = multiprocessing.Queue() jobid_list = [] node_list = self.hostlist_clients + lib_path = os.getenv("LD_LIBRARY_PATH") + path = os.getenv("PATH") + v_env = os.getenv("VIRTUAL_ENV") + env = f"export LD_LIBRARY_PATH={lib_path}; export PATH={path}; export VIRTUAL_ENV={v_env}" for job_dict in self.Job_List: jobid_list.append(job_dict["jobid"]) self.log.info(f"Submitting {len(jobid_list)} jobs at {time.ctime()}") @@ -339,7 +343,7 @@ def schedule_jobs(self): error_log = job_dict["joberrlog"] method = launch_jobscript params = (self.log, job_queue, job_id, job_node_list, - script, log, error_log, timeout, self) + env, script, log, error_log, timeout, self) name = f"SOAK JOB {job_id}" jobs.append(threading.Thread(target=method, args=params, name=name)) diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 20ea3528016..042066005fe 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -385,7 +385,8 @@ def wait_for_pool_rebuild(self, pool, name): return rebuild_status -def launch_jobscript(log, job_queue, job_id, host_list, script, job_log, error_log, timeout, test): +def launch_jobscript( + log, job_queue, job_id, host_list, env, script, job_log, error_log, timeout, test): """_summary_ Args: @@ -393,12 +394,14 @@ def launch_jobscript(log, job_queue, job_id, host_list, script, job_log, error_l job_queue (_type_): _description_ job_id (_type_): _description_ host_list (_type_): _description_ + env (_type_): _description_ script (_type_): _description_ job_log (_type_): _description_ error_log (_type_): _description_ timeout (_type_): _description_ test (_type_): _description_ """ + debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} ENTERED launch_jobscript") job_results = [] node_results = [] @@ -421,7 +424,7 @@ def launch_jobscript(log, job_queue, job_id, host_list, script, job_log, error_l error_log1 = error_log.replace("JOBID", str(job_id)) joblog = job_log1.replace("RHOST", str(rhost)) errorlog = error_log1.replace("RHOST", str(rhost)) - cmd = f"{script} {hosts} {job_id} > {joblog} 2> {errorlog}" + cmd = f"{env} {script} {hosts} {job_id} > {joblog} 2> {errorlog}" job_results = run_remote( log, rhost, cmd, verbose=False, timeout=timeout * 60, task_debug=False, stderr=False) if job_results: From 66262f265db41ac3ade5f597bd13a59334c6ca22 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Wed, 24 Jul 2024 08:04:48 -0400 Subject: [PATCH 05/25] add env Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Required-githooks: true Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 23100916502..8967e3c906c 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -312,7 +312,9 @@ def schedule_jobs(self): lib_path = os.getenv("LD_LIBRARY_PATH") path = os.getenv("PATH") v_env = os.getenv("VIRTUAL_ENV") - env = f"export LD_LIBRARY_PATH={lib_path}; export PATH={path}; export VIRTUAL_ENV={v_env}" + env = ";".join([f"export LD_LIBRARY_PATH={lib_path}", + f"export PATH={path}", + f"export VIRTUAL_ENV={v_env}"]) for job_dict in self.Job_List: jobid_list.append(job_dict["jobid"]) self.log.info(f"Submitting {len(jobid_list)} jobs at {time.ctime()}") From c42b72b745d1e9472d20f18029143ff2b86c2dc5 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Wed, 24 Jul 2024 10:09:44 -0400 Subject: [PATCH 06/25] fix typo in env cmd Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Required-githooks: true Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 042066005fe..5a27950a739 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -424,7 +424,7 @@ def launch_jobscript( error_log1 = error_log.replace("JOBID", str(job_id)) joblog = job_log1.replace("RHOST", str(rhost)) errorlog = error_log1.replace("RHOST", str(rhost)) - cmd = f"{env} {script} {hosts} {job_id} > {joblog} 2> {errorlog}" + cmd = ";".join([env, f"{script} {hosts} {job_id} > {joblog} 2> {errorlog}"]) job_results = run_remote( log, rhost, cmd, verbose=False, timeout=timeout * 60, task_debug=False, stderr=False) if job_results: From 2bc80627a803223bc353b5011d443659c7c56fc8 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Thu, 25 Jul 2024 11:39:11 -0400 Subject: [PATCH 07/25] debug - remove daos_client partition Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Required-githooks: true Signed-off-by: Maureen Jean --- src/tests/ftest/soak/harassers.yaml | 2 +- src/tests/ftest/soak/smoke.yaml | 2 +- src/tests/ftest/soak/stress.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tests/ftest/soak/harassers.yaml b/src/tests/ftest/soak/harassers.yaml index c02a0ae90ce..2de5b15bc07 100644 --- a/src/tests/ftest/soak/harassers.yaml +++ b/src/tests/ftest/soak/harassers.yaml @@ -3,7 +3,7 @@ hosts: test_servers: 8 # servers if a server partition is defined # server_partition: daos_server - client_partition: daos_client + # client_partition: daos_client # client_reservation: daos-test orterun: allow_run_as_root: true diff --git a/src/tests/ftest/soak/smoke.yaml b/src/tests/ftest/soak/smoke.yaml index ca1d4fb7a4c..4c5594b4ab8 100644 --- a/src/tests/ftest/soak/smoke.yaml +++ b/src/tests/ftest/soak/smoke.yaml @@ -3,7 +3,7 @@ hosts: test_servers: 4 # servers if a server partition is defined # server_partition: daos_server - client_partition: daos_client + # client_partition: daos_client # client_reservation: daos-test orterun: allow_run_as_root: true diff --git a/src/tests/ftest/soak/stress.yaml b/src/tests/ftest/soak/stress.yaml index 15a6a3033a3..4031caff216 100644 --- a/src/tests/ftest/soak/stress.yaml +++ b/src/tests/ftest/soak/stress.yaml @@ -3,7 +3,7 @@ hosts: test_servers: 8 # servers if a server partition is defined # server_partition: daos_server - client_partition: daos_client + # client_partition: daos_client # client_reservation: daos-test orterun: allow_run_as_root: true From 239049670b01c4f95e7627c93921f301fcf64b35 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Fri, 26 Jul 2024 18:26:46 -0400 Subject: [PATCH 08/25] use general utils get_journalctl Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Required-githooks: true Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 4 ++-- src/tests/ftest/util/soak_utils.py | 22 +++++++++------------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 8967e3c906c..3dc1f794571 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -26,7 +26,7 @@ create_app_cmdline, create_dm_cmdline, create_fio_cmdline, create_ior_cmdline, create_macsio_cmdline, create_mdtest_cmdline, create_racer_cmdline, ddhhmmss_format, debug_logging, get_daos_server_logs, - get_harassers, get_id, get_job_logs, get_journalctl, + get_harassers, get_id, get_job_logs, get_journalctl_logs, launch_exclude_reintegrate, launch_extend, launch_jobscript, launch_reboot, launch_server_stop_start, launch_snapshot, launch_vmd_identify_check, reserved_file_copy, run_event_check, run_metrics_check, run_monitor_check) @@ -182,7 +182,7 @@ def pre_tear_down(self): since = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.start_time)) until = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.end_time)) for journalctl_type in ["kernel", "daos_server"]: - get_journalctl(self, hosts, since, until, journalctl_type, logging=True) + get_journalctl_logs(self, hosts, since, until, journalctl_type) if self.all_failed_harassers: errors.extend(self.all_failed_harassers) diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 5a27950a739..99bdf270900 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -24,7 +24,7 @@ from duns_utils import format_path from exception_utils import CommandFailure from fio_utils import FioCommand -from general_utils import (DaosTestError, check_ping, check_ssh, get_host_data, get_log_file, +from general_utils import (DaosTestError, check_ping, check_ssh, get_journalctl, get_log_file, get_random_bytes, get_random_string, list_to_str, pcmd, run_command, run_pcmd, wait_for_result) from ior_utils import IorCommand @@ -199,7 +199,7 @@ def run_event_check(self, since, until): hosts = list(set(self.hostlist_servers)) if events: for journalctl_type in ["kernel", "daos_server"]: - for output in get_journalctl(self, hosts, since, until, journalctl_type): + for output in get_journalctl(hosts, since, until, journalctl_type): for event in events: lines = output["data"].splitlines() for line in lines: @@ -213,7 +213,7 @@ def run_event_check(self, since, until): return events_found -def get_journalctl(self, hosts, since, until, journalctl_type, logging=False): +def get_journalctl_logs(self, hosts, since, until, journalctl_type): """Run the journalctl on daos servers. Args: @@ -229,18 +229,14 @@ def get_journalctl(self, hosts, since, until, journalctl_type, logging=False): "data": data requested for the group of hosts """ - command = "{} /usr/bin/journalctl --system -t {} --since=\"{}\" --until=\"{}\"".format( - self.sudo_cmd, journalctl_type, since, until) - err = "Error gathering system log events" - results = get_host_data(hosts, command, "journalctl", err) + results = get_journalctl(hosts, since, until, journalctl_type) name = f"journalctl_{journalctl_type}.log" destination = self.outputsoak_dir - if logging: - for result in results: - for host in result["hosts"]: - log_name = name + "-" + str(host) - self.log.info("Logging %s output to %s", command, log_name) - write_logfile(result["data"], log_name, destination) + for result in results: + for host in result["hosts"]: + log_name = name + "-" + str(host) + self.log.info("Logging output to %s", log_name) + write_logfile(result["data"], log_name, destination) return results From 86bfb2587f48458a24f78acf8dc94a761698aea4 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Tue, 30 Jul 2024 10:50:09 -0400 Subject: [PATCH 09/25] fix liniting issues Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Required-githooks: true Signed-off-by: Maureen Jean --- src/tests/ftest/soak/harassers.yaml | 2 +- src/tests/ftest/soak/smoke.yaml | 2 +- src/tests/ftest/soak/stress.yaml | 2 +- src/tests/ftest/util/soak_test_base.py | 23 ++++++++--------- src/tests/ftest/util/soak_utils.py | 35 +++++++++++++++----------- 5 files changed, 34 insertions(+), 30 deletions(-) diff --git a/src/tests/ftest/soak/harassers.yaml b/src/tests/ftest/soak/harassers.yaml index 2de5b15bc07..c02a0ae90ce 100644 --- a/src/tests/ftest/soak/harassers.yaml +++ b/src/tests/ftest/soak/harassers.yaml @@ -3,7 +3,7 @@ hosts: test_servers: 8 # servers if a server partition is defined # server_partition: daos_server - # client_partition: daos_client + client_partition: daos_client # client_reservation: daos-test orterun: allow_run_as_root: true diff --git a/src/tests/ftest/soak/smoke.yaml b/src/tests/ftest/soak/smoke.yaml index 4c5594b4ab8..ca1d4fb7a4c 100644 --- a/src/tests/ftest/soak/smoke.yaml +++ b/src/tests/ftest/soak/smoke.yaml @@ -3,7 +3,7 @@ hosts: test_servers: 4 # servers if a server partition is defined # server_partition: daos_server - # client_partition: daos_client + client_partition: daos_client # client_reservation: daos-test orterun: allow_run_as_root: true diff --git a/src/tests/ftest/soak/stress.yaml b/src/tests/ftest/soak/stress.yaml index 4031caff216..15a6a3033a3 100644 --- a/src/tests/ftest/soak/stress.yaml +++ b/src/tests/ftest/soak/stress.yaml @@ -3,7 +3,7 @@ hosts: test_servers: 8 # servers if a server partition is defined # server_partition: daos_server - # client_partition: daos_client + client_partition: daos_client # client_reservation: daos-test orterun: allow_run_as_root: true diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 3dc1f794571..21633475c06 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -31,8 +31,6 @@ launch_server_stop_start, launch_snapshot, launch_vmd_identify_check, reserved_file_copy, run_event_check, run_metrics_check, run_monitor_check) -J_LOCK = threading.Lock() - class SoakTestBase(TestWithServers): # pylint: disable=too-many-public-methods @@ -82,7 +80,7 @@ def __init__(self, *args, **kwargs): self.soak_dir = None self.enable_scrubber = False self.job_scheduler = None - self.Job_List = None + self.joblist = None self.enable_debug_msg = False def setUp(self): @@ -315,7 +313,7 @@ def schedule_jobs(self): env = ";".join([f"export LD_LIBRARY_PATH={lib_path}", f"export PATH={path}", f"export VIRTUAL_ENV={v_env}"]) - for job_dict in self.Job_List: + for job_dict in self.joblist: jobid_list.append(job_dict["jobid"]) self.log.info(f"Submitting {len(jobid_list)} jobs at {time.ctime()}") while True: @@ -323,7 +321,7 @@ def schedule_jobs(self): break jobs = [] job_results = {} - for job_dict in self.Job_List: + for job_dict in self.joblist: job_id = job_dict["jobid"] if job_id in jobid_list: node_count = job_dict["nodesperjob"] @@ -424,7 +422,7 @@ def job_setup(self, jobs, pool): # Create a dictionary of all job definitions for jobscript in jobscripts: jobtimeout = self.params.get("job_timeout", "/run/" + job + "/*", 10) - self.Job_List.extend([{"jobscript": jobscript[0], + self.joblist.extend([{"jobscript": jobscript[0], "nodesperjob": npj, "taskspernode": ppn, "hostlist": None, @@ -434,8 +432,7 @@ def job_setup(self, jobs, pool): "joberrlog": jobscript[2]}]) # randomize job list random.seed(4) - random.shuffle(self.Job_List) - return + random.shuffle(self.joblist) def job_startup(self): """Launch the job script. @@ -452,7 +449,7 @@ def job_startup(self): return job_id_list if self.job_scheduler == "slurm": - for job_dict in self.Job_List: + for job_dict in self.joblist: script = job_dict["jobscript"] try: job_id = slurm_utils.run_slurm_script(self.log, str(script)) @@ -473,7 +470,7 @@ def job_startup(self): job_id_list = [] raise SoakTestError(f"<>") else: - for job_dict in self.Job_List: + for job_dict in self.joblist: job_dict["jobid"] = get_id() job_id_list.append(job_dict["jobid"]) @@ -495,6 +492,8 @@ def job_completion(self, job_id_list): failed_job_id_list: IDs of each job that failed in slurm """ + # pylint: disable=too-many-nested-blocks + self.log.info("<> at %s", self.test_name, time.ctime()) harasser_interval = 0 failed_harasser_msg = None @@ -591,7 +590,7 @@ def execute_jobs(self, jobs, pools): """ jobid_list = [] - self.Job_List = [] + self.joblist = [] # Update the remote log directories from new loop/pass sharedsoaktest_dir = self.sharedsoak_dir + "/pass" + str(self.loop) outputsoaktest_dir = self.outputsoak_dir + "/pass" + str(self.loop) @@ -638,7 +637,7 @@ def run_soak(self, test_param): """ self.soak_results = {} - self.Job_List = [] + self.joblist = [] self.pool = [] self.container = [] self.harasser_results = {} diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 99bdf270900..54a59652bf3 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -59,16 +59,21 @@ def ddhhmmss_format(seconds): def get_id(): + """Increment a counter to generate job ids + + Returns: + int : next counter value + """ return next(id_counter) def debug_logging(log, enable_debug_msg, log_msg): - """_summary_ + """Enable debug messages in log file. Args: - log (_type_): _description_ - enable_debug_msg (_type_): _description_ - log_msg (_type_): _description_ + log (logger): logger for the messages produced by this method + enable_debug_msg (boolean): If true, the debug message will be written to log + log_msg (str): debug message to write to log """ if enable_debug_msg: log.debug(log_msg) @@ -383,19 +388,19 @@ def wait_for_pool_rebuild(self, pool, name): def launch_jobscript( log, job_queue, job_id, host_list, env, script, job_log, error_log, timeout, test): - """_summary_ + """Launch the job script on remote node. Args: - log (_type_): _description_ - job_queue (_type_): _description_ - job_id (_type_): _description_ - host_list (_type_): _description_ - env (_type_): _description_ - script (_type_): _description_ - job_log (_type_): _description_ - error_log (_type_): _description_ - timeout (_type_): _description_ - test (_type_): _description_ + log (logger): logger for the messages produced by this method + job_queue (Queue): job queue to post status of job + job_id (int): unique job identifier + host_list (list): list of node to pass to job script + env (str): environment variables for job script + script (str): full path to jobscript + job_log (str): job std out + error_log (str): job std error + timeout (int): job timeout + test (TestObj): soak test obj """ debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} ENTERED launch_jobscript") From 04665e779be41e0ce6ab60ef267d8a21d48fe41e Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Thu, 1 Aug 2024 08:03:24 -0400 Subject: [PATCH 10/25] fix linting issues Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Required-githooks: true Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 14 +++++++------- src/tests/ftest/util/soak_utils.py | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 8ec4bcb07d5..4ca7b241586 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -421,13 +421,13 @@ def job_setup(self, jobs, pool): for jobscript in jobscripts: jobtimeout = self.params.get("job_timeout", "/run/" + job + "/*", 10) self.joblist.extend([{"jobscript": jobscript[0], - "nodesperjob": npj, - "taskspernode": ppn, - "hostlist": None, - "jobid": None, - "jobtimeout": jobtimeout, - "joblog": jobscript[1], - "joberrlog": jobscript[2]}]) + "nodesperjob": npj, + "taskspernode": ppn, + "hostlist": None, + "jobid": None, + "jobtimeout": jobtimeout, + "joblog": jobscript[1], + "joberrlog": jobscript[2]}]) # randomize job list random.seed(4) random.shuffle(self.joblist) diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 1dcf9ec6832..75486db17e8 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -394,7 +394,7 @@ def launch_jobscript( job_id (int): unique job identifier host_list (list): list of node to pass to job script env (str): environment variables for job script - script (str): full path to jobscript + script (str): full path to job script job_log (str): job std out error_log (str): job std error timeout (int): job timeout From 61e742251c2f66fe8b9ab1615febedf8bee45ec6 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Wed, 7 Aug 2024 10:20:30 -0400 Subject: [PATCH 11/25] Add jobs_not_done to track completed jobs Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 24 +++++---- src/tests/ftest/util/soak_utils.py | 69 +++++++++++++++++++------- 2 files changed, 66 insertions(+), 27 deletions(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index ded9233d861..05ac292b65c 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -26,7 +26,7 @@ create_app_cmdline, create_dm_cmdline, create_fio_cmdline, create_ior_cmdline, create_macsio_cmdline, create_mdtest_cmdline, create_racer_cmdline, ddhhmmss_format, debug_logging, get_daos_server_logs, - get_harassers, get_id, get_job_logs, get_journalctl_logs, + get_harassers, get_id, get_job_logs, get_journalctl_logs, job_cleanup, launch_exclude_reintegrate, launch_extend, launch_jobscript, launch_reboot, launch_server_stop_start, launch_snapshot, launch_vmd_identify_check, reserved_file_copy, run_event_check, run_metrics_check, run_monitor_check) @@ -146,7 +146,8 @@ def pre_tear_down(self): if self.all_failed_jobs: errors.append("SOAK FAILED: The following jobs failed {} ".format( " ,".join(str(j_id) for j_id in self.all_failed_jobs))) - + # cleanup any remaining jobs + job_cleanup(self.log, self.hostlist_clients) # verify reserved container data if self.resv_cont: final_resv_file = os.path.join(self.test_dir, "final", "resv_file") @@ -313,8 +314,9 @@ def schedule_jobs(self): for job_dict in self.joblist: jobid_list.append(job_dict["jobid"]) self.log.info(f"Submitting {len(jobid_list)} jobs at {time.ctime()}") + jobs_not_done = jobid_list while True: - if time.time() > self.end_time or len(jobid_list) == 0: + if time.time() > self.end_time or len(jobs_not_done) == 0: break jobs = [] job_results = {} @@ -343,7 +345,8 @@ def schedule_jobs(self): env, script, log, error_log, timeout, self) name = f"SOAK JOB {job_id}" - jobs.append(threading.Thread(target=method, args=params, name=name)) + jobs.append(threading.Thread( + target=method, args=params, name=name, daemon=True)) jobid_list.remove(job_id) node_list = node_list[node_count:] debug_logging( @@ -353,13 +356,14 @@ def schedule_jobs(self): # run job scripts on all available nodes for job in jobs: job.start() - debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs started") - for job in jobs: - job.join() - debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs joined") + # debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs started") + # for job in jobs: + # job.join() + # debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs joined") while not job_queue.empty(): job_results = job_queue.get() # Results to return in queue + jobs_not_done.remove(job_results["handle"]) node_list.update(job_results["host_list"]) debug_logging(self.log, self.enable_debug_msg, "DBG: Updating soak results") self.soak_results[job_results["handle"]] = job_results["state"] @@ -474,7 +478,7 @@ def job_startup(self): # self.schedule_jobs() method = self.schedule_jobs name = "Job Scheduler" - scheduler = threading.Thread(target=method, name=name) + scheduler = threading.Thread(target=method, name=name, daemon=True) # scheduler = multiprocessing.Process(target=method, name=name) scheduler.start() @@ -658,7 +662,7 @@ def run_soak(self, test_param): resv_bytes = self.params.get("resv_bytes", test_param + "*", 500000000) ignore_soak_errors = self.params.get("ignore_soak_errors", test_param + "*", False) self.enable_il = self.params.get("enable_intercept_lib", test_param + "*", False) - self.sudo_cmd = "sudo" if enable_sudo else "" + self.sudo_cmd = "sudo -n" if enable_sudo else "" self.enable_remote_logging = self.params.get( "enable_remote_logging", os.path.join(test_param, "*"), False) self.enable_scrubber = self.params.get( diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 75486db17e8..6865f16fb15 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -5,6 +5,7 @@ """ # pylint: disable=too-many-lines +import getpass import os import random import re @@ -16,6 +17,7 @@ from avocado.core.exceptions import TestFail from avocado.utils.distro import detect from ClusterShell.NodeSet import NodeSet +from command_utils import command_as_user from command_utils_base import EnvironmentVariables from daos_racer_utils import DaosRacerCommand from data_mover_utils import DcpCommand, FsCopy @@ -367,7 +369,8 @@ def wait_for_pool_rebuild(self, pool, name): """ rebuild_status = False self.log.info("<> at %s", name, pool.identifier, time.ctime()) - self.dmg_command.server_set_logmasks("DEBUG", raise_exception=False) + # TODO - create yaml param to enable/disable rebuild debug logging + # self.dmg_command.server_set_logmasks("DEBUG", raise_exception=False) try: # # Wait for rebuild to start # pool.wait_for_rebuild_to_start() @@ -380,10 +383,34 @@ def wait_for_pool_rebuild(self, pool, name): except TestFail as error1: self.log.error( f"<< {joblog} 2> {errorlog}"]) + cmd = ";".join([env, f"{script} {hosts} {job_id} {joblog} {errorlog}"]) job_results = run_remote( log, rhost, cmd, verbose=False, timeout=timeout * 60, task_debug=False, stderr=False) if job_results: @@ -435,6 +462,8 @@ def launch_jobscript( state = "FAILED" else: state = "UNKNOWN" + # attempt to cleanup any leftover job processes on timeout + job_cleanup(log, hosts) if time.time() >= test.end_time: results = {"handle": job_id, "state": "CANCELLED", "host_list": host_list} debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript") @@ -625,16 +654,16 @@ def launch_reboot(self, pools, name, results, args): self.log.info( "<<>>\n", self.loop, name, ranks, time.ctime()) # reboot host in 1 min - result = run_remote(self.log, reboot_host, "sudo shutdown -r +1") + result = run_remote(self.log, reboot_host, command_as_user("shutdown -r +1", "root")) if result.passed: status = True else: - self.log.error(f"<<>>\n", self.loop, name, reboot_host, time.ctime()) - cmd_results = run_remote(self.log, reboot_host, "sudo systemctl restart daos_server") + cmd_results = run_remote( + self.log, reboot_host, command_as_user("systemctl restart daos_server", "root")) if cmd_results.passed: self.dmg_command.system_query() for pool in pools: @@ -983,9 +1013,10 @@ def start_dfuse(self, pool, container, name=None, job_spec=None): self.soak_log_dir, self.test_name + "_" + name + "_`hostname -s`_" "" + "${JOB_ID}_" + "daos_dfuse.log") - dfuse_env = ";".join(["export D_LOG_FILE_APPEND_PID=1", - "export D_LOG_MASK=ERR", - f"export D_LOG_FILE={dfuselog}"]) + dfuse_env = ";".join( + ["export D_LOG_FILE_APPEND_PID=1", + "export D_LOG_MASK=ERR", + f"export D_LOG_FILE={dfuselog}"]) module_load = ";".join([f"module use {self.mpi_module_use}", f"module load {self.mpi_module}"]) dfuse_start_cmds = [ @@ -1015,8 +1046,8 @@ def stop_dfuse(dfuse, vol=False): dfuse.mount_dir.value)]) dfuse_stop_cmds.extend([ - "clush -S -w $HOSTLIST \"fusermount3 -uz {0}\"".format(dfuse.mount_dir.value), - "clush -S -w $HOSTLIST \"rm -rf {0}\"".format(dfuse.mount_dir.value)]) + f'clush -S -w $HOSTLIST "fusermount3 -uz {dfuse.mount_dir.value}"', + f'clush -S -w $HOSTLIST "rm -rf {dfuse.mount_dir.value}"']) return dfuse_stop_cmds @@ -1473,14 +1504,14 @@ def create_app_cmdline(self, job_spec, pool, ppn, nodesperjob): # ${DAOS_TEST_APP_SRC}/suse => apps built with suse and gnu-mpich # pylint: disable-next=wrong-spelling-in-comment,fixme # ${DAOS_TEST_APP_SRC}/suse/intelmpi => apps built with suse and intelmpi - if "suse" in detect().name.lower(): + if "suse" in detect().name.lower() and os.environ.get("DAOS_TEST_MODE") is None: os.environ["DAOS_TEST_APP_DIR"] += os.path.join(os.sep, "suse") - if "mpi/latest" in mpi_module: + if "mpi/latest" in mpi_module and os.environ.get("DAOS_TEST_MODE") is None: os.environ["DAOS_TEST_APP_DIR"] += os.path.join(os.sep, "intelmpi") os.environ["I_MPI_OFI_LIBRARY_INTERNAL"] = "0" app_cmd = os.path.expandvars(self.params.get("cmdline", app_params, default=None)) if app_cmd is None: - self.log.info(f"<<{job_spec} command line not specified in yaml; job will not be run>>") + self.log.info(f"<<{job_spec} command line not specified in yaml>>") return commands oclass_list = self.params.get("oclass", app_params) for file_oclass, dir_oclass in oclass_list: @@ -1682,6 +1713,8 @@ def build_job_script(self, commands, job, nodesperjob, ppn): else: script_file.write("HOSTLIST=$1 \n") script_file.write("JOB_ID=$2 \n") + script_file.write("JOB_LOG=$3 \n") + script_file.write("JOB_ERROR_LOG=$4 \n") script_file.write("echo \"JOB NODES: \" $HOSTLIST \n") script_file.write("echo \"JOB ID: \" $JOB_ID \n") script_file.write("if [ -z \"$VIRTUAL_ENV\" ]; then \n") @@ -1689,6 +1722,8 @@ def build_job_script(self, commands, job, nodesperjob, ppn): script_file.write("else \n") script_file.write(" source $VIRTUAL_ENV/bin/activate \n") script_file.write("fi \n") + script_file.write("exec 1> $JOB_LOG \n") + script_file.write("exec 2> $JOB_ERROR_LOG \n") for cmd in list(job_cmds): script_file.write(cmd + "\n") From e784fa3591f07453b9c3b9db194e38894a0e907f Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Wed, 7 Aug 2024 12:08:42 -0400 Subject: [PATCH 12/25] debug Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 05ac292b65c..7d98f86de58 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -316,7 +316,7 @@ def schedule_jobs(self): self.log.info(f"Submitting {len(jobid_list)} jobs at {time.ctime()}") jobs_not_done = jobid_list while True: - if time.time() > self.end_time or len(jobs_not_done) == 0: + if time.time() > self.end_time or len(jobid_list) == 0: break jobs = [] job_results = {} @@ -363,7 +363,6 @@ def schedule_jobs(self): while not job_queue.empty(): job_results = job_queue.get() # Results to return in queue - jobs_not_done.remove(job_results["handle"]) node_list.update(job_results["host_list"]) debug_logging(self.log, self.enable_debug_msg, "DBG: Updating soak results") self.soak_results[job_results["handle"]] = job_results["state"] From cbae14bec1c494e83e12128b8e2287aa66146cdc Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Wed, 7 Aug 2024 16:01:47 -0400 Subject: [PATCH 13/25] Check for all jobs to be done Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 7d98f86de58..c94469c4734 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -304,6 +304,7 @@ def schedule_jobs(self): debug_logging(self.log, self.enable_debug_msg, "DBG: schedule_jobs ENTERED ") job_queue = multiprocessing.Queue() jobid_list = [] + jobs_not_done = [] node_list = self.hostlist_clients lib_path = os.getenv("LD_LIBRARY_PATH") path = os.getenv("PATH") @@ -313,10 +314,10 @@ def schedule_jobs(self): f"export VIRTUAL_ENV={v_env}"]) for job_dict in self.joblist: jobid_list.append(job_dict["jobid"]) + jobs_not_done.append(job_dict["jobid"]) self.log.info(f"Submitting {len(jobid_list)} jobs at {time.ctime()}") - jobs_not_done = jobid_list while True: - if time.time() > self.end_time or len(jobid_list) == 0: + if time.time() > self.end_time or len(jobs_not_done) == 0: break jobs = [] job_results = {} @@ -356,16 +357,15 @@ def schedule_jobs(self): # run job scripts on all available nodes for job in jobs: job.start() - # debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs started") - # for job in jobs: - # job.join() - # debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs joined") + while not job_queue.empty(): job_results = job_queue.get() # Results to return in queue node_list.update(job_results["host_list"]) debug_logging(self.log, self.enable_debug_msg, "DBG: Updating soak results") self.soak_results[job_results["handle"]] = job_results["state"] + job_done_id = job_results["handle"] + jobs_not_done.remove(job_done_id) debug_logging( self.log, self.enable_debug_msg, From 4cd4cb1b222486741d29ae102d171f615ef4f3c2 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Thu, 22 Aug 2024 08:53:02 -0400 Subject: [PATCH 14/25] update jobscript VIRTUAL_ENV check Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index c94469c4734..e3cae2b369f 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -310,8 +310,9 @@ def schedule_jobs(self): path = os.getenv("PATH") v_env = os.getenv("VIRTUAL_ENV") env = ";".join([f"export LD_LIBRARY_PATH={lib_path}", - f"export PATH={path}", - f"export VIRTUAL_ENV={v_env}"]) + f"export PATH={path}"]) + if v_env: + env = ";".join([env, f"export VIRTUAL_ENV={v_env}"]) for job_dict in self.joblist: jobid_list.append(job_dict["jobid"]) jobs_not_done.append(job_dict["jobid"]) From f4a8e3ce0be7eee00bb44cd3f0828f97f56ffb22 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Sun, 25 Aug 2024 10:25:32 -0400 Subject: [PATCH 15/25] include join Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index eabdaa3f6c1..3952649fef6 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -360,6 +360,9 @@ def schedule_jobs(self): for job in jobs: job.start() + for job in jobs: + job.join() + while not job_queue.empty(): job_results = job_queue.get() # Results to return in queue From be071c623f343886b7869d759e90bb2f512e19d1 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Wed, 28 Aug 2024 07:30:18 -0400 Subject: [PATCH 16/25] include HDF5 vol env vars for container Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 44d88777eca..0467d6ed2fb 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -1167,6 +1167,9 @@ def create_ior_cmdline(self, job_spec, pool, ppn, nodesperjob, oclass_list=None, # add envs if api is HDF5-VOL if api == "HDF5-VOL": vol = True + cont_props = container.properties.value + env["HDF5_DAOS_FILE_PROP"] = '"' + cont_props.replace(",", ";") + '"' + env["HDF5_DAOS_OBJ_CLASS"] = file_dir_oclass[0] env["HDF5_VOL_CONNECTOR"] = "daos" env["HDF5_PLUGIN_PATH"] = str(plugin_path) mpirun_cmd.assign_processes(nodesperjob * ppn) From c0a06567492da9d07d939fd9d388fa8c596bbd17 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Mon, 9 Sep 2024 14:46:17 -0400 Subject: [PATCH 17/25] Include updates fro feedback Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 39 ++++++++++++++++++++------ src/tests/ftest/util/soak_utils.py | 9 +++--- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 3952649fef6..d3f898343b2 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -83,6 +83,7 @@ def __init__(self, *args, **kwargs): self.joblist = None self.enable_debug_msg = False self.enable_rebuild_logmasks = False + self.down_nodes = None def setUp(self): """Define test setup to be done.""" @@ -306,7 +307,9 @@ def schedule_jobs(self): job_queue = multiprocessing.Queue() jobid_list = [] jobs_not_done = [] + # remove any nodes marked as DOWN node_list = self.hostlist_clients + node_list.difference_update(self.down_nodes) lib_path = os.getenv("LD_LIBRARY_PATH") path = os.getenv("PATH") v_env = os.getenv("VIRTUAL_ENV") @@ -318,10 +321,10 @@ def schedule_jobs(self): jobid_list.append(job_dict["jobid"]) jobs_not_done.append(job_dict["jobid"]) self.log.info(f"Submitting {len(jobid_list)} jobs at {time.ctime()}") + job_threads = [] while True: if time.time() > self.end_time or len(jobs_not_done) == 0: break - jobs = [] job_results = {} for job_dict in self.joblist: job_id = job_dict["jobid"] @@ -347,26 +350,40 @@ def schedule_jobs(self): params = (self.log, job_queue, job_id, job_node_list, env, script, log, error_log, timeout, self) name = f"SOAK JOB {job_id}" - - jobs.append(threading.Thread( - target=method, args=params, name=name, daemon=True)) + _thread = threading.Thread( + target=method, args=params, name=name, daemon=True) + job_threads.append(_thread) jobid_list.remove(job_id) node_list = node_list[node_count:] debug_logging( self.log, self.enable_debug_msg, f"DBG: node_list after launch_job {node_list}") - # run job scripts on all available nodes - for job in jobs: - job.start() - for job in jobs: + # Start this job + _thread.start() + + # If we don't process any results this time, we'll sleep before checking again + do_sleep = True + + # Keep reference only to threads that are still running + _alive_threads = [] + for job in job_threads: + if job.is_alive(): + _alive_threads.append(job) + continue + # join finished threads to be safe job.join() + # Don't sleep - starting scheduling immediately + do_sleep = False + job_threads = _alive_threads + # Process results, if any while not job_queue.empty(): job_results = job_queue.get() # Results to return in queue node_list.update(job_results["host_list"]) + self.down_nodes.update(job_results["down_nodes"]) debug_logging(self.log, self.enable_debug_msg, "DBG: Updating soak results") self.soak_results[job_results["handle"]] = job_results["state"] job_done_id = job_results["handle"] @@ -375,6 +392,11 @@ def schedule_jobs(self): self.log, self.enable_debug_msg, f"DBG: node_list returned from queue {node_list}") + + # Sleep to avoid spinlock + if do_sleep: + time.sleep(3) + debug_logging(self.log, self.enable_debug_msg, "DBG: schedule_jobs EXITED ") def job_setup(self, jobs, pool): @@ -653,6 +675,7 @@ def run_soak(self, test_param): self.soak_errors = [] self.check_errors = [] self.used = [] + self.down_nodes = NodeSet() self.enable_debug_msg = self.params.get("enable_debug_msg", "/run/*", default=False) self.mpi_module = self.params.get("mpi_module", "/run/*", default="mpi/mpich-x86_64") self.mpi_module_use = self.params.get( diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index abf038d2d40..cbece5b0d36 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -254,7 +254,7 @@ def get_daos_server_logs(self): self (obj): soak obj """ daos_dir = self.outputsoak_dir + "/daos_server_logs" - logs_dir = os.environ.get("DAOS_TEST_LOG_DIR", "/tmp/") + logs_dir = self.test_env.log_dir hosts = self.hostlist_servers if not os.path.exists(daos_dir): os.mkdir(daos_dir) @@ -433,6 +433,7 @@ def launch_jobscript( debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} ENTERED launch_jobscript") job_results = [] node_results = [] + down_nodes = NodeSet() state = "UNKNOWN" if time.time() >= test.end_time: results = {"handle": job_id, "state": "CANCELLED", "host_list": host_list} @@ -480,11 +481,11 @@ def launch_jobscript( if node_results.failed_hosts: for node in node_results.failed_hosts: host_list.remove(node) - debug_logging( - log, test.enable_debug_msg, "DBG: Node {node} is marked as DOWN in job {job_id}") + down_nodes.update(node) + log.info(f"DBG: Node {node} is marked as DOWN in job {job_id}") log.info("FINAL STATE: soak job %s completed with : %s at %s", job_id, state, time.ctime()) - results = {"handle": job_id, "state": state, "host_list": host_list} + results = {"handle": job_id, "state": state, "host_list": host_list, "down_nodes": down_nodes} debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript") job_queue.put(results) # give time to update the queue before exiting From d3d39f187592c0fbfe37779678b3e82514c3e133 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Wed, 11 Sep 2024 08:36:50 -0400 Subject: [PATCH 18/25] fix linting Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 6 +++--- src/tests/ftest/util/soak_utils.py | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index d3f898343b2..d1034fcd2c9 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -320,7 +320,7 @@ def schedule_jobs(self): for job_dict in self.joblist: jobid_list.append(job_dict["jobid"]) jobs_not_done.append(job_dict["jobid"]) - self.log.info(f"Submitting {len(jobid_list)} jobs at {time.ctime()}") + self.log.info("Submitting %s jobs at %s", str(len(jobid_list)), time.ctime()) job_threads = [] while True: if time.time() > self.end_time or len(jobs_not_done) == 0: @@ -393,7 +393,7 @@ def schedule_jobs(self): self.enable_debug_msg, f"DBG: node_list returned from queue {node_list}") - # Sleep to avoid spinlock + # Sleep to avoid spin lock if do_sleep: time.sleep(3) @@ -547,7 +547,7 @@ def job_completion(self, job_id_list): else: # update soak_results to include job id NOT run and set state = CANCELLED for job in job_id_list: - if job not in list(self.soak_results.keys()): + if job not in self.soak_results: self.soak_results.update({job: "CANCELLED"}) self.log.info("FINAL STATE: soak job %s completed with : %s at %s", job, "CANCELLED", time.ctime()) diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index cbece5b0d36..caf466f630f 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -1690,8 +1690,6 @@ def build_job_script(self, commands, job, nodesperjob, ppn): if sbatch_params: for key, value in list(sbatch_params.items()): if value is not None: - if key == "error": - value = value script_file.write("#SBATCH --{}={}\n".format(key, value)) else: script_file.write("#SBATCH --{}\n".format(key)) From 2dc93d9a82598e66a87d49509e6c1e6fa51b23dc Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Wed, 11 Sep 2024 13:23:46 -0400 Subject: [PATCH 19/25] fix quotes Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_utils.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index caf466f630f..4d240125648 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -1701,18 +1701,18 @@ def build_job_script(self, commands, job, nodesperjob, ppn): script_file.write("fi \n") script_file.write("HOSTLIST=`nodeset -e -S \",\" $SLURM_JOB_NODELIST` \n") script_file.write("JOB_ID=$SLURM_JOB_ID \n") - script_file.write("echo \"SLURM NODES: \" $SLURM_JOB_NODELIST \n") - script_file.write("echo \"NODE COUNT: \" $SLURM_JOB_NUM_NODES \n") - script_file.write("echo \"JOB ID: \" $JOB_ID \n") - script_file.write("echo \"HOSTLIST: \" $HOSTLIST \n") + script_file.write("echo \"SLURM NODES: $SLURM_JOB_NODELIST \" \n") + script_file.write("echo \"NODE COUNT: $SLURM_JOB_NUM_NODES \" \n") + script_file.write("echo \"JOB ID: $JOB_ID \" \n") + script_file.write("echo \"HOSTLIST: $HOSTLIST \" \n") script_file.write("\n") else: script_file.write("HOSTLIST=$1 \n") script_file.write("JOB_ID=$2 \n") script_file.write("JOB_LOG=$3 \n") script_file.write("JOB_ERROR_LOG=$4 \n") - script_file.write("echo \"JOB NODES: \" $HOSTLIST \n") - script_file.write("echo \"JOB ID: \" $JOB_ID \n") + script_file.write("echo \"JOB NODES: $HOSTLIST \" \n") + script_file.write("echo \"JOB ID: $JOB_ID \" \n") script_file.write("if [ -z \"$VIRTUAL_ENV\" ]; then \n") script_file.write(" echo \"VIRTUAL_ENV not defined\" \n") script_file.write("else \n") From 5fa1404ef37646058360c1140a57512771a13877 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Thu, 19 Sep 2024 07:07:19 -0400 Subject: [PATCH 20/25] change check for down nodes Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 4d240125648..b94c08f4278 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -476,7 +476,7 @@ def launch_jobscript( return # check if all nodes are available - cmd = "hostname -s" + cmd = f"ls {test.test_env.log_dir}" node_results = run_remote(log, NodeSet(hosts), cmd, verbose=False) if node_results.failed_hosts: for node in node_results.failed_hosts: From 8fde68e37194f56ebde55b138c92cc5e04ee442c Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Thu, 19 Sep 2024 07:10:39 -0400 Subject: [PATCH 21/25] Add pragmas Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Signed-off-by: Maureen Jean From 7e608a6126b9bc10e9fee729815d42fc70c719b3 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Thu, 19 Sep 2024 21:07:57 -0400 Subject: [PATCH 22/25] exit soak if not enough nodes Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 36 +++++++++++++++++++------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index d1034fcd2c9..07d46530090 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -301,14 +301,17 @@ def harasser_job_done(self, args): self.harasser_results[args["name"]] = args["status"] self.harasser_args[args["name"]] = args["vars"] - def schedule_jobs(self): - """Schedule jobs with internal scheduler.""" + def schedule_jobs(self, node_list): + """Schedule jobs with internal scheduler. + + Args: + node_list (list): list of nodes to use in jobs + """ debug_logging(self.log, self.enable_debug_msg, "DBG: schedule_jobs ENTERED ") job_queue = multiprocessing.Queue() jobid_list = [] jobs_not_done = [] # remove any nodes marked as DOWN - node_list = self.hostlist_clients node_list.difference_update(self.down_nodes) lib_path = os.getenv("LD_LIBRARY_PATH") path = os.getenv("PATH") @@ -326,6 +329,16 @@ def schedule_jobs(self): if time.time() > self.end_time or len(jobs_not_done) == 0: break job_results = {} + # verify that there are enough nodes to run remaining jobs + if len(job_threads) == 0: + for job_dict in self.joblist: + job_id = job_dict["jobid"] + if job_id in jobs_not_done: + node_count = job_dict["nodesperjob"] + if len(node_list) < node_count: + # cancel job + self.soak_results.update({job_id: "CANCELLED"}) + jobs_not_done.remove(job_id) for job_dict in self.joblist: job_id = job_dict["jobid"] if job_id in jobid_list: @@ -416,14 +429,17 @@ def job_setup(self, jobs, pool): jobscripts = [] # command is a list of [sbatch_cmds, log_name] to create a single job script commands = [] + total_nodes = NodeSet(self.hostlist_clients) + if self.down_nodes: + total_nodes.difference_update(self.down_nodes) nodesperjob = self.params.get("nodesperjob", "/run/" + job + "/*", [1]) taskspernode = self.params.get("taskspernode", "/run/" + job + "/*", [1]) for npj in list(nodesperjob): # nodesperjob = -1 indicates to use all nodes in client hostlist if npj < 0: - npj = len(self.hostlist_clients) - if len(self.hostlist_clients) / npj < 1: - raise SoakTestError(f"< Date: Thu, 19 Sep 2024 21:08:44 -0400 Subject: [PATCH 23/25] Add pragmas Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Signed-off-by: Maureen Jean From 5ae46a635c27f170b799122cc18e9a5cb916b43a Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Sun, 22 Sep 2024 07:27:50 -0400 Subject: [PATCH 24/25] add logging message when job is cancelled Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 07d46530090..66f7ea4e3d4 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -338,6 +338,11 @@ def schedule_jobs(self, node_list): if len(node_list) < node_count: # cancel job self.soak_results.update({job_id: "CANCELLED"}) + log.info( + "FINAL STATE: soak job %s completed with : %s at %s", + job_id, + "CANCELLED", + time.ctime()) jobs_not_done.remove(job_id) for job_dict in self.joblist: job_id = job_dict["jobid"] From 079866390875d4c9aa01cf6d4be97d1d98a32ab6 Mon Sep 17 00:00:00 2001 From: Maureen Jean Date: Mon, 23 Sep 2024 06:49:07 -0400 Subject: [PATCH 25/25] fix logging Skip-unit-tests: true Skip-fault-injection-test: true Test-tag: soak_smoke Signed-off-by: Maureen Jean --- src/tests/ftest/util/soak_test_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index 66f7ea4e3d4..fc9161f3f3a 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -338,7 +338,7 @@ def schedule_jobs(self, node_list): if len(node_list) < node_count: # cancel job self.soak_results.update({job_id: "CANCELLED"}) - log.info( + self.log.info( "FINAL STATE: soak job %s completed with : %s at %s", job_id, "CANCELLED",