Skip to content

Commit

Permalink
Add jobs_not_done to track completed jobs
Browse files Browse the repository at this point in the history
Skip-unit-tests: true
Skip-fault-injection-test: true
Test-tag: soak_smoke

Signed-off-by: Maureen Jean <maureen.jean@intel.com>
  • Loading branch information
mjean308 committed Aug 7, 2024
1 parent 8acf486 commit 61e7422
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 27 deletions.
24 changes: 14 additions & 10 deletions src/tests/ftest/util/soak_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
create_app_cmdline, create_dm_cmdline, create_fio_cmdline,
create_ior_cmdline, create_macsio_cmdline, create_mdtest_cmdline,
create_racer_cmdline, ddhhmmss_format, debug_logging, get_daos_server_logs,
get_harassers, get_id, get_job_logs, get_journalctl_logs,
get_harassers, get_id, get_job_logs, get_journalctl_logs, job_cleanup,
launch_exclude_reintegrate, launch_extend, launch_jobscript, launch_reboot,
launch_server_stop_start, launch_snapshot, launch_vmd_identify_check,
reserved_file_copy, run_event_check, run_metrics_check, run_monitor_check)
Expand Down Expand Up @@ -146,7 +146,8 @@ def pre_tear_down(self):
if self.all_failed_jobs:
errors.append("SOAK FAILED: The following jobs failed {} ".format(
" ,".join(str(j_id) for j_id in self.all_failed_jobs)))

# cleanup any remaining jobs
job_cleanup(self.log, self.hostlist_clients)
# verify reserved container data
if self.resv_cont:
final_resv_file = os.path.join(self.test_dir, "final", "resv_file")
Expand Down Expand Up @@ -313,8 +314,9 @@ def schedule_jobs(self):
for job_dict in self.joblist:
jobid_list.append(job_dict["jobid"])
self.log.info(f"Submitting {len(jobid_list)} jobs at {time.ctime()}")

Check warning on line 316 in src/tests/ftest/util/soak_test_base.py

View workflow job for this annotation

GitHub Actions / Pylint check

logging-fstring-interpolation, Use lazy % formatting in logging functions
jobs_not_done = jobid_list
while True:
if time.time() > self.end_time or len(jobid_list) == 0:
if time.time() > self.end_time or len(jobs_not_done) == 0:
break
jobs = []
job_results = {}
Expand Down Expand Up @@ -343,7 +345,8 @@ def schedule_jobs(self):
env, script, log, error_log, timeout, self)
name = f"SOAK JOB {job_id}"

jobs.append(threading.Thread(target=method, args=params, name=name))
jobs.append(threading.Thread(
target=method, args=params, name=name, daemon=True))
jobid_list.remove(job_id)
node_list = node_list[node_count:]
debug_logging(
Expand All @@ -353,13 +356,14 @@ def schedule_jobs(self):
# run job scripts on all available nodes
for job in jobs:
job.start()
debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs started")
for job in jobs:
job.join()
debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs joined")
# debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs started")
# for job in jobs:
# job.join()
# debug_logging(self.log, self.enable_debug_msg, "DBG: all jobs joined")
while not job_queue.empty():
job_results = job_queue.get()
# Results to return in queue
jobs_not_done.remove(job_results["handle"])
node_list.update(job_results["host_list"])
debug_logging(self.log, self.enable_debug_msg, "DBG: Updating soak results")
self.soak_results[job_results["handle"]] = job_results["state"]
Expand Down Expand Up @@ -474,7 +478,7 @@ def job_startup(self):
# self.schedule_jobs()
method = self.schedule_jobs
name = "Job Scheduler"
scheduler = threading.Thread(target=method, name=name)
scheduler = threading.Thread(target=method, name=name, daemon=True)
# scheduler = multiprocessing.Process(target=method, name=name)
scheduler.start()

Expand Down Expand Up @@ -658,7 +662,7 @@ def run_soak(self, test_param):
resv_bytes = self.params.get("resv_bytes", test_param + "*", 500000000)
ignore_soak_errors = self.params.get("ignore_soak_errors", test_param + "*", False)
self.enable_il = self.params.get("enable_intercept_lib", test_param + "*", False)
self.sudo_cmd = "sudo" if enable_sudo else ""
self.sudo_cmd = "sudo -n" if enable_sudo else ""
self.enable_remote_logging = self.params.get(
"enable_remote_logging", os.path.join(test_param, "*"), False)
self.enable_scrubber = self.params.get(
Expand Down
69 changes: 52 additions & 17 deletions src/tests/ftest/util/soak_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
# pylint: disable=too-many-lines

import getpass
import os
import random
import re
Expand All @@ -16,6 +17,7 @@
from avocado.core.exceptions import TestFail
from avocado.utils.distro import detect
from ClusterShell.NodeSet import NodeSet
from command_utils import command_as_user
from command_utils_base import EnvironmentVariables
from daos_racer_utils import DaosRacerCommand
from data_mover_utils import DcpCommand, FsCopy
Expand Down Expand Up @@ -367,7 +369,8 @@ def wait_for_pool_rebuild(self, pool, name):
"""
rebuild_status = False
self.log.info("<<Wait for %s rebuild on %s>> at %s", name, pool.identifier, time.ctime())
self.dmg_command.server_set_logmasks("DEBUG", raise_exception=False)
# TODO - create yaml param to enable/disable rebuild debug logging

Check warning on line 372 in src/tests/ftest/util/soak_utils.py

View workflow job for this annotation

GitHub Actions / Pylint check

fixme, TODO - create yaml param to enable/disable rebuild debug logging
# self.dmg_command.server_set_logmasks("DEBUG", raise_exception=False)
try:
# # Wait for rebuild to start
# pool.wait_for_rebuild_to_start()
Expand All @@ -380,10 +383,34 @@ def wait_for_pool_rebuild(self, pool, name):
except TestFail as error1:
self.log.error(
f"<<<FAILED: {name} rebuild failed due to test issue: {error1}", exc_info=error1)
self.dmg_command.server_set_logmasks(raise_exception=False)
# self.dmg_command.server_set_logmasks(raise_exception=False)
return rebuild_status


def job_cleanup(log, hosts):
"""Cleanup after job is done.
Args:
log (logger): logger for the messages produced by this method
hosts (list): list of node to pass to job script
"""
current_user = getpass.getuser()
for job in ["mpirun", "palsd", "dfuse"]:
cmd = [f"/usr/bin/bash -c 'for pid in $(pgrep -u {current_user} {job})",
"do kill -HUP $pid",
"done'"]
run_remote(
log, hosts, ";".join(cmd), verbose=False, timeout=600, task_debug=False, stderr=False)
if job == "dfuse":
cmd2 = [
"/usr/bin/bash -c 'for dir in $(find /tmp/soak_dfuse_*/)",
"do fusermount3 -uz $dir",
"rm -rf $dir",
"done'"]
run_remote(log, hosts, ";".join(cmd2), verbose=False, timeout=600, task_debug=False,
stderr=False)


def launch_jobscript(
log, job_queue, job_id, host_list, env, script, job_log, error_log, timeout, test):
"""Launch the job script on remote node.
Expand Down Expand Up @@ -423,7 +450,7 @@ def launch_jobscript(
error_log1 = error_log.replace("JOBID", str(job_id))
joblog = job_log1.replace("RHOST", str(rhost))
errorlog = error_log1.replace("RHOST", str(rhost))
cmd = ";".join([env, f"{script} {hosts} {job_id} > {joblog} 2> {errorlog}"])
cmd = ";".join([env, f"{script} {hosts} {job_id} {joblog} {errorlog}"])
job_results = run_remote(
log, rhost, cmd, verbose=False, timeout=timeout * 60, task_debug=False, stderr=False)
if job_results:
Expand All @@ -435,6 +462,8 @@ def launch_jobscript(
state = "FAILED"
else:
state = "UNKNOWN"
# attempt to cleanup any leftover job processes on timeout
job_cleanup(log, hosts)
if time.time() >= test.end_time:
results = {"handle": job_id, "state": "CANCELLED", "host_list": host_list}
debug_logging(log, test.enable_debug_msg, f"DBG: JOB {job_id} EXITED launch_jobscript")
Expand Down Expand Up @@ -625,16 +654,16 @@ def launch_reboot(self, pools, name, results, args):
self.log.info(
"<<<PASS %s: %s started on ranks %s at %s >>>\n", self.loop, name, ranks, time.ctime())
# reboot host in 1 min
result = run_remote(self.log, reboot_host, "sudo shutdown -r +1")
result = run_remote(self.log, reboot_host, command_as_user("shutdown -r +1", "root"))
if result.passed:
status = True
else:
self.log.error(f"<<<FAILED:{name} - {reboot_host} failed to issue reboot")
self.log.error(f"<<<FAILED: {name} - {reboot_host} failed to issue reboot")
status = False

if not wait_for_result(self.log, check_ping, 90, 5, True, host=reboot_host,
expected_ping=False, cmd_timeout=60, verbose=True):
self.log.error(f"<<<FAILED:{name} - {reboot_host} failed to reboot")
self.log.error(f"<<<FAILED: {name} - {reboot_host} failed to reboot")
status = False

if status:
Expand All @@ -654,17 +683,18 @@ def launch_reboot(self, pools, name, results, args):
# wait for node to complete rebooting
if not wait_for_result(self.log, check_ping, 60, 5, True, host=reboot_host,
expected_ping=True, cmd_timeout=60, verbose=True):
self.log.error(f"<<<FAILED:{name} - {reboot_host} failed to reboot")
self.log.error(f"<<<FAILED: {name} - {reboot_host} failed to reboot")
status = False
if not wait_for_result(self.log, check_ssh, 120, 2, True, hosts=reboot_host,
cmd_timeout=30, verbose=True):
self.log.error(f"<<<FAILED:{name} - {reboot_host} failed to reboot")
self.log.error(f"<<<FAILED: {name} - {reboot_host} failed to reboot")
status = False
if status:
# issue a restart
self.log.info("<<<PASS %s: Issue systemctl restart daos_server on %s at %s>>>\n",
self.loop, name, reboot_host, time.ctime())
cmd_results = run_remote(self.log, reboot_host, "sudo systemctl restart daos_server")
cmd_results = run_remote(
self.log, reboot_host, command_as_user("systemctl restart daos_server", "root"))
if cmd_results.passed:
self.dmg_command.system_query()
for pool in pools:
Expand Down Expand Up @@ -983,9 +1013,10 @@ def start_dfuse(self, pool, container, name=None, job_spec=None):
self.soak_log_dir,
self.test_name + "_" + name + "_`hostname -s`_"
"" + "${JOB_ID}_" + "daos_dfuse.log")
dfuse_env = ";".join(["export D_LOG_FILE_APPEND_PID=1",
"export D_LOG_MASK=ERR",
f"export D_LOG_FILE={dfuselog}"])
dfuse_env = ";".join(
["export D_LOG_FILE_APPEND_PID=1",
"export D_LOG_MASK=ERR",
f"export D_LOG_FILE={dfuselog}"])
module_load = ";".join([f"module use {self.mpi_module_use}", f"module load {self.mpi_module}"])

dfuse_start_cmds = [
Expand Down Expand Up @@ -1015,8 +1046,8 @@ def stop_dfuse(dfuse, vol=False):
dfuse.mount_dir.value)])

dfuse_stop_cmds.extend([
"clush -S -w $HOSTLIST \"fusermount3 -uz {0}\"".format(dfuse.mount_dir.value),
"clush -S -w $HOSTLIST \"rm -rf {0}\"".format(dfuse.mount_dir.value)])
f'clush -S -w $HOSTLIST "fusermount3 -uz {dfuse.mount_dir.value}"',
f'clush -S -w $HOSTLIST "rm -rf {dfuse.mount_dir.value}"'])
return dfuse_stop_cmds


Expand Down Expand Up @@ -1473,14 +1504,14 @@ def create_app_cmdline(self, job_spec, pool, ppn, nodesperjob):
# ${DAOS_TEST_APP_SRC}/suse => apps built with suse and gnu-mpich
# pylint: disable-next=wrong-spelling-in-comment,fixme
# ${DAOS_TEST_APP_SRC}/suse/intelmpi => apps built with suse and intelmpi
if "suse" in detect().name.lower():
if "suse" in detect().name.lower() and os.environ.get("DAOS_TEST_MODE") is None:
os.environ["DAOS_TEST_APP_DIR"] += os.path.join(os.sep, "suse")
if "mpi/latest" in mpi_module:
if "mpi/latest" in mpi_module and os.environ.get("DAOS_TEST_MODE") is None:
os.environ["DAOS_TEST_APP_DIR"] += os.path.join(os.sep, "intelmpi")
os.environ["I_MPI_OFI_LIBRARY_INTERNAL"] = "0"
app_cmd = os.path.expandvars(self.params.get("cmdline", app_params, default=None))
if app_cmd is None:
self.log.info(f"<<{job_spec} command line not specified in yaml; job will not be run>>")
self.log.info(f"<<{job_spec} command line not specified in yaml>>")
return commands
oclass_list = self.params.get("oclass", app_params)
for file_oclass, dir_oclass in oclass_list:
Expand Down Expand Up @@ -1682,13 +1713,17 @@ def build_job_script(self, commands, job, nodesperjob, ppn):
else:
script_file.write("HOSTLIST=$1 \n")
script_file.write("JOB_ID=$2 \n")
script_file.write("JOB_LOG=$3 \n")
script_file.write("JOB_ERROR_LOG=$4 \n")
script_file.write("echo \"JOB NODES: \" $HOSTLIST \n")
script_file.write("echo \"JOB ID: \" $JOB_ID \n")
script_file.write("if [ -z \"$VIRTUAL_ENV\" ]; then \n")
script_file.write(" echo \"VIRTUAL_ENV not defined\" \n")
script_file.write("else \n")
script_file.write(" source $VIRTUAL_ENV/bin/activate \n")
script_file.write("fi \n")
script_file.write("exec 1> $JOB_LOG \n")
script_file.write("exec 2> $JOB_ERROR_LOG \n")

for cmd in list(job_cmds):
script_file.write(cmd + "\n")
Expand Down

0 comments on commit 61e7422

Please sign in to comment.