diff --git a/src/tests/ftest/util/soak_test_base.py b/src/tests/ftest/util/soak_test_base.py index ded9233d861..05ac292b65c 100644 --- a/src/tests/ftest/util/soak_test_base.py +++ b/src/tests/ftest/util/soak_test_base.py @@ -26,7 +26,7 @@ create_app_cmdline, create_dm_cmdline, create_fio_cmdline, create_ior_cmdline, create_macsio_cmdline, create_mdtest_cmdline, create_racer_cmdline, ddhhmmss_format, debug_logging, get_daos_server_logs, - get_harassers, get_id, get_job_logs, get_journalctl_logs, + get_harassers, get_id, get_job_logs, get_journalctl_logs, job_cleanup, launch_exclude_reintegrate, launch_extend, launch_jobscript, launch_reboot, launch_server_stop_start, launch_snapshot, launch_vmd_identify_check, reserved_file_copy, run_event_check, run_metrics_check, run_monitor_check) @@ -146,7 +146,8 @@ def pre_tear_down(self): if self.all_failed_jobs: errors.append("SOAK FAILED: The following jobs failed {} ".format( " ,".join(str(j_id) for j_id in self.all_failed_jobs))) - + # cleanup any remaining jobs + job_cleanup(self.log, self.hostlist_clients) # verify reserved container data if self.resv_cont: final_resv_file = os.path.join(self.test_dir, "final", "resv_file") @@ -313,8 +314,9 @@ def schedule_jobs(self): for job_dict in self.joblist: jobid_list.append(job_dict["jobid"]) self.log.info(f"Submitting {len(jobid_list)} jobs at {time.ctime()}") + jobs_not_done = jobid_list while True: - if time.time() > self.end_time or len(jobid_list) == 0: + if time.time() > self.end_time or len(jobs_not_done) == 0: break jobs = [] job_results = {} @@ -343,7 +345,8 @@ def schedule_jobs(self): env, script, log, error_log, timeout, self) name = f"SOAK JOB {job_id}" - jobs.append(threading.Thread(target=method, args=params, name=name)) + jobs.append(threading.Thread( + target=method, args=params, name=name, daemon=True)) jobid_list.remove(job_id) node_list = node_list[node_count:] debug_logging( @@ -353,13 +356,14 @@ def schedule_jobs(self): # run job scripts on all available nodes for job in jobs: job.start() - debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs started") - for job in jobs: - job.join() - debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs joined") + # debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs started") + # for job in jobs: + # job.join() + # debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs joined") while not job_queue.empty(): job_results = job_queue.get() # Results to return in queue + jobs_not_done.remove(job_results["handle"]) node_list.update(job_results["host_list"]) debug_logging(self.log, self.enable_debug_msg, "DBG: Updating soak results") self.soak_results[job_results["handle"]] = job_results["state"] @@ -474,7 +478,7 @@ def job_startup(self): # self.schedule_jobs() method = self.schedule_jobs name = "Job Scheduler" - scheduler = threading.Thread(target=method, name=name) + scheduler = threading.Thread(target=method, name=name, daemon=True) # scheduler = multiprocessing.Process(target=method, name=name) scheduler.start() @@ -658,7 +662,7 @@ def run_soak(self, test_param): resv_bytes = self.params.get("resv_bytes", test_param + "*", 500000000) ignore_soak_errors = self.params.get("ignore_soak_errors", test_param + "*", False) self.enable_il = self.params.get("enable_intercept_lib", test_param + "*", False) - self.sudo_cmd = "sudo" if enable_sudo else "" + self.sudo_cmd = "sudo -n" if enable_sudo else "" self.enable_remote_logging = self.params.get( "enable_remote_logging", os.path.join(test_param, "*"), False) self.enable_scrubber = self.params.get( diff --git a/src/tests/ftest/util/soak_utils.py b/src/tests/ftest/util/soak_utils.py index 75486db17e8..6865f16fb15 100644 --- a/src/tests/ftest/util/soak_utils.py +++ b/src/tests/ftest/util/soak_utils.py @@ -5,6 +5,7 @@ """ # pylint: disable=too-many-lines +import getpass import os import random import re @@ -16,6 +17,7 @@ from avocado.core.exceptions import TestFail from avocado.utils.distro import detect from ClusterShell.NodeSet import NodeSet +from command_utils import command_as_user from command_utils_base import EnvironmentVariables from daos_racer_utils import DaosRacerCommand from data_mover_utils import DcpCommand, FsCopy @@ -367,7 +369,8 @@ def wait_for_pool_rebuild(self, pool, name): """ rebuild_status = False self.log.info("<> at %s", name, pool.identifier, time.ctime()) - self.dmg_command.server_set_logmasks("DEBUG", raise_exception=False) + # TODO - create yaml param to enable/disable rebuild debug logging + # self.dmg_command.server_set_logmasks("DEBUG", raise_exception=False) try: # # Wait for rebuild to start # pool.wait_for_rebuild_to_start() @@ -380,10 +383,34 @@ def wait_for_pool_rebuild(self, pool, name): except TestFail as error1: self.log.error( f"<< {joblog} 2> {errorlog}"]) + cmd = ";".join([env, f"{script} {hosts} {job_id} {joblog} {errorlog}"]) job_results = run_remote( log, rhost, cmd, verbose=False, timeout=timeout * 60, task_debug=False, stderr=False) if job_results: @@ -435,6 +462,8 @@ def launch_jobscript( state = "FAILED" else: state = "UNKNOWN" + # attempt to cleanup any leftover job processes on timeout + job_cleanup(log, hosts) if time.time() >= test.end_time: results = {"handle": job_id, "state": "CANCELLED", "host_list": host_list} debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript") @@ -625,16 +654,16 @@ def launch_reboot(self, pools, name, results, args): self.log.info( "<<>>\n", self.loop, name, ranks, time.ctime()) # reboot host in 1 min - result = run_remote(self.log, reboot_host, "sudo shutdown -r +1") + result = run_remote(self.log, reboot_host, command_as_user("shutdown -r +1", "root")) if result.passed: status = True else: - self.log.error(f"<<>>\n", self.loop, name, reboot_host, time.ctime()) - cmd_results = run_remote(self.log, reboot_host, "sudo systemctl restart daos_server") + cmd_results = run_remote( + self.log, reboot_host, command_as_user("systemctl restart daos_server", "root")) if cmd_results.passed: self.dmg_command.system_query() for pool in pools: @@ -983,9 +1013,10 @@ def start_dfuse(self, pool, container, name=None, job_spec=None): self.soak_log_dir, self.test_name + "_" + name + "_`hostname -s`_" "" + "${JOB_ID}_" + "daos_dfuse.log") - dfuse_env = ";".join(["export D_LOG_FILE_APPEND_PID=1", - "export D_LOG_MASK=ERR", - f"export D_LOG_FILE={dfuselog}"]) + dfuse_env = ";".join( + ["export D_LOG_FILE_APPEND_PID=1", + "export D_LOG_MASK=ERR", + f"export D_LOG_FILE={dfuselog}"]) module_load = ";".join([f"module use {self.mpi_module_use}", f"module load {self.mpi_module}"]) dfuse_start_cmds = [ @@ -1015,8 +1046,8 @@ def stop_dfuse(dfuse, vol=False): dfuse.mount_dir.value)]) dfuse_stop_cmds.extend([ - "clush -S -w $HOSTLIST \"fusermount3 -uz {0}\"".format(dfuse.mount_dir.value), - "clush -S -w $HOSTLIST \"rm -rf {0}\"".format(dfuse.mount_dir.value)]) + f'clush -S -w $HOSTLIST "fusermount3 -uz {dfuse.mount_dir.value}"', + f'clush -S -w $HOSTLIST "rm -rf {dfuse.mount_dir.value}"']) return dfuse_stop_cmds @@ -1473,14 +1504,14 @@ def create_app_cmdline(self, job_spec, pool, ppn, nodesperjob): # ${DAOS_TEST_APP_SRC}/suse => apps built with suse and gnu-mpich # pylint: disable-next=wrong-spelling-in-comment,fixme # ${DAOS_TEST_APP_SRC}/suse/intelmpi => apps built with suse and intelmpi - if "suse" in detect().name.lower(): + if "suse" in detect().name.lower() and os.environ.get("DAOS_TEST_MODE") is None: os.environ["DAOS_TEST_APP_DIR"] += os.path.join(os.sep, "suse") - if "mpi/latest" in mpi_module: + if "mpi/latest" in mpi_module and os.environ.get("DAOS_TEST_MODE") is None: os.environ["DAOS_TEST_APP_DIR"] += os.path.join(os.sep, "intelmpi") os.environ["I_MPI_OFI_LIBRARY_INTERNAL"] = "0" app_cmd = os.path.expandvars(self.params.get("cmdline", app_params, default=None)) if app_cmd is None: - self.log.info(f"<<{job_spec} command line not specified in yaml; job will not be run>>") + self.log.info(f"<<{job_spec} command line not specified in yaml>>") return commands oclass_list = self.params.get("oclass", app_params) for file_oclass, dir_oclass in oclass_list: @@ -1682,6 +1713,8 @@ def build_job_script(self, commands, job, nodesperjob, ppn): else: script_file.write("HOSTLIST=$1 \n") script_file.write("JOB_ID=$2 \n") + script_file.write("JOB_LOG=$3 \n") + script_file.write("JOB_ERROR_LOG=$4 \n") script_file.write("echo \"JOB NODES: \" $HOSTLIST \n") script_file.write("echo \"JOB ID: \" $JOB_ID \n") script_file.write("if [ -z \"$VIRTUAL_ENV\" ]; then \n") @@ -1689,6 +1722,8 @@ def build_job_script(self, commands, job, nodesperjob, ppn): script_file.write("else \n") script_file.write(" source $VIRTUAL_ENV/bin/activate \n") script_file.write("fi \n") + script_file.write("exec 1> $JOB_LOG \n") + script_file.write("exec 2> $JOB_ERROR_LOG \n") for cmd in list(job_cmds): script_file.write(cmd + "\n")