Skip to content

Commit

Permalink
Merge pull request #183 from SCM-NV/DavidOrmrodMorley/fix_multijob_lo…
Browse files Browse the repository at this point in the history
…cking

Improve Exception Handling in Pre and Post Run Methods SCMSUITE-10131 SO107
  • Loading branch information
dormrod authored Dec 11, 2024
2 parents dcc9d11 + d906581 commit d8a857d
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 45 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This changelog is effective from the 2025 releases.
* Build using `pyproject.toml`, addition of extras groups to install optional dependencies
* Logging of job summaries to CSV logfile
* Logging of AMS job error messages to stdout and logfile on job failure
* Method `get_errormsg` enforced on the `Job` base class, with a default implementation

### Changed
* Functions for optional packages (e.g. RDKit, ASE) are available even when these packages are not installed, but will raise an `MissingOptionalPackageError` when called
Expand All @@ -33,12 +34,15 @@ This changelog is effective from the 2025 releases.
* Restructuring of examples and conversion of various examples to notebooks
* Support for `networkx>=3` and `ase>=3.23`
* Use standard library logger for `log` function
* Make `Job` class inherit from `ABC` and mark abstract methods
* Exceptions raised in `prerun` and `postrun` will always be caught and populate error message

### Fixed
* `Molecule.properties.charge` is a numeric instead of string type when loading molecule from a file
* `Molecule.delete_all_bonds` removes the reference molecule from the removed bond instances
* `SingleJob.load` returns the correctly loaded job
* `AMSJob.check` handles a `NoneType` status, returning `False`
* `MultiJob.run` locking resolved when errors raised within `prerun` and `postrun` methods

### Deprecated
* `plams` launch script is deprecated in favour of simply running with `amspython`
Expand Down
71 changes: 55 additions & 16 deletions core/basejob.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import time
from os.path import join as opj
from typing import TYPE_CHECKING, Dict, Generator, Iterable, List, Optional, Union
from abc import ABC, abstractmethod
import traceback

from scm.plams.core.enums import JobStatus
from scm.plams.core.enums import JobStatus, JobStatusType
from scm.plams.core.errors import FileError, JobError, PlamsError, ResultsError
from scm.plams.core.functions import config, log
from scm.plams.core.private import sha256
Expand All @@ -28,7 +30,27 @@
__all__ = ["SingleJob", "MultiJob"]


class Job:
def _fail_on_exception(func):
"""Decorator to wrap a job method and mark the job as failed on any exception."""

def wrapper(self: "Job", *args, **kwargs):
try:
return func(self, *args, **kwargs)
except Exception:
# Mark job status as failed and the results as complete
self.status = JobStatus.FAILED
self.results.finished.set() # type: ignore
self.results.done.set() # type: ignore
# Notify any parent multi-job of the failure
if self.parent and self in self.parent: # type: ignore
self.parent._notify() # type: ignore
# Store the exception message to be accessed from get_errormsg
self._error_msg = traceback.format_exc()

return wrapper


class Job(ABC):
"""General abstract class for all kind of computational tasks.
Methods common for all kinds of jobs are gathered here. Instances of |Job| should never be created. It should not be subclassed either. If you wish to define a new type of job please subclass either |SingleJob| or |MultiJob|.
Expand Down Expand Up @@ -82,6 +104,7 @@ def __init__(
self.default_settings = [config.job]
self.depend = depend or []
self._dont_pickle: List[str] = []
self._error_msg: Optional[str] = None
if settings is not None:
if isinstance(settings, Settings):
self.settings = settings.copy()
Expand All @@ -101,14 +124,14 @@ def __init__(
# =======================================================================

@property
def status(self) -> JobStatus:
def status(self) -> JobStatusType:
"""
Current status of the job
"""
return self._status

@status.setter
def status(self, value: JobStatus) -> None:
def status(self, value: JobStatusType) -> None:
# This setter should really be private i.e. internally should use self._status
# But for backwards compatibility it is exposed and set by e.g. the JobManager
self._status = value
Expand All @@ -128,6 +151,7 @@ def run(
"""
if self.status != JobStatus.CREATED:
raise JobError("Trying to run previously started job {}".format(self.name))
self._error_msg = None
self.status = JobStatus.STARTED
self._log_status(1)

Expand Down Expand Up @@ -182,13 +206,24 @@ def ok(self, strict: bool = True) -> bool:
self.results.wait()
return self.status in [JobStatus.SUCCESSFUL, JobStatus.COPIED]

@abstractmethod
def check(self) -> bool:
"""Check if the execution of this instance was successful. Abstract method meant for internal use."""
raise PlamsError("Trying to run an abstract method Job.check()")
"""Check if the execution of this instance was successful."""

def get_errormsg(self) -> Optional[str]:
"""Tries to get an error message for a failed job. This method returns ``None`` for successful jobs."""
if self.check():
return None

return (
self._error_msg
if self._error_msg
else "Could not determine error message. Please check the output manually."
)

@abstractmethod
def hash(self) -> Optional[str]:
"""Calculate the hash of this instance. Abstract method meant for internal use."""
raise PlamsError("Trying to run an abstract method Job.hash()")
"""Calculate the hash of this instance."""

def prerun(self) -> None: # noqa F811
"""Actions to take before the actual job execution.
Expand All @@ -204,6 +239,7 @@ def postrun(self) -> None:

# =======================================================================

@_fail_on_exception
def _prepare(self, jobmanager: "JobManager") -> bool:
"""Prepare the job for execution. This method collects steps 1-7 from :ref:`job-life-cycle`. Should not be overridden. Returned value indicates if job execution should continue (|RPM| did not find this job as previously run)."""

Expand Down Expand Up @@ -248,14 +284,15 @@ def _prepare(self, jobmanager: "JobManager") -> bool:
self._log_status(3)
return prev is None

@abstractmethod
def _get_ready(self) -> None:
"""Get ready for :meth:`~Job._execute`. This is the last step before :meth:`~Job._execute` is called. Abstract method."""
raise PlamsError("Trying to run an abstract method Job._get_ready()")
"""Get ready for :meth:`~Job._execute`. This is the last step before :meth:`~Job._execute` is called."""

@abstractmethod
def _execute(self, jobrunner: "JobRunner") -> None:
"""Execute the job. Abstract method."""
raise PlamsError("Trying to run an abstract method Job._execute()")
"""Execute the job."""

@_fail_on_exception
def _finalize(self) -> None:
"""Gather the results of the job execution and organize them. This method collects steps 9-12 from :ref:`job-life-cycle`. Should not be overridden."""
log("Starting {}._finalize()".format(self.name), 7)
Expand Down Expand Up @@ -333,15 +370,16 @@ def __init__(self, molecule: Optional[Molecule] = None, **kwargs):
Job.__init__(self, **kwargs)
self.molecule = molecule.copy() if isinstance(molecule, Molecule) else molecule

@abstractmethod
def get_input(self) -> str:
"""Generate the input file. Abstract method.
"""Generate the input file.
This method should return a single string with the full content of the input file. It should process information stored in the ``input`` branch of job settings and in the ``molecule`` attribute.
"""
raise PlamsError("Trying to run an abstract method SingleJob._get_input()")

@abstractmethod
def get_runscript(self) -> str:
"""Generate the runscript. Abstract method.
"""Generate the runscript.
This method should return a single string with the runscript contents. It can process information stored in ``runscript`` branch of job settings. In general the full runscript has the following form::
Expand All @@ -355,7 +393,6 @@ def get_runscript(self) -> str:
When overridden, this method should pay attention to ``.runscript.stdout_redirect`` key in job's ``settings``.
"""
raise PlamsError("Trying to run an abstract method SingleJob._get_runscript()")

def hash_input(self) -> str:
"""Calculate SHA256 hash of the input file."""
Expand Down Expand Up @@ -433,6 +470,7 @@ def _get_ready(self) -> None:

os.chmod(runfile, os.stat(runfile).st_mode | stat.S_IEXEC)

@_fail_on_exception
def _execute(self, jobrunner) -> None:
"""Execute previously created runscript using *jobrunner*.
Expand Down Expand Up @@ -661,6 +699,7 @@ def _notify(self) -> None:
with self._lock:
self._active_children -= 1

@_fail_on_exception
def _execute(self, jobrunner: "JobRunner") -> None:
"""Run all children from ``children``. Then use :meth:`~MultiJob.new_children` and run all jobs produced by it. Repeat this procedure until :meth:`~MultiJob.new_children` returns an empty list. Wait for all started jobs to finish."""
log("Starting {}._execute()".format(self.name), 7)
Expand Down
22 changes: 21 additions & 1 deletion core/enums.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Literal, Union

# Import StrEnum for Python >=3.11, otherwise use backwards compatible class
try:
from enum import StrEnum # type: ignore
Expand All @@ -13,7 +15,7 @@ def __str__(self) -> str:
return str(self.value)


__all__ = ["JobStatus"]
__all__ = ["JobStatus", "JobStatusType"]


class JobStatus(StrEnum):
Expand All @@ -32,3 +34,21 @@ class JobStatus(StrEnum):
COPIED = "copied"
PREVIEW = "preview"
DELETED = "deleted"


JobStatusType = Union[
JobStatus,
Literal[
"created",
"started",
"registered",
"running",
"finished",
"crashed",
"failed",
"successful",
"copied",
"preview",
"deleted",
],
]
11 changes: 2 additions & 9 deletions core/formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,8 @@ def _format_job(job: Job) -> Dict[str, Any]:

if job.status not in [JobStatus.REGISTERED, JobStatus.RUNNING]:
message.update({"job_ok": job.ok()})
try:
message.update({"job_check": job.check()})
except TypeError:
pass
try:
# this one it is not supported by the Job class but on many jobs they have it implemented
message.update({"job_get_errormsg": job.get_errormsg()})
except (AttributeError, TypeError):
pass
message.update({"job_check": job.check()})
message.update({"job_get_errormsg": job.get_errormsg()})

if job.parent:
message["job_parent_name"] = job.parent.name
Expand Down
7 changes: 1 addition & 6 deletions core/jobrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,7 @@ def _run_job(self, job, jobmanager):
try:
# Log any error messages to the standard logger
if not job.ok(False) or not job.check():
# get_errormsg is not required by the base job class, but often implemented by convention
err_msg = (
job.get_errormsg()
if hasattr(job, "get_errormsg")
else "Could not determine error message. Please check the output manually."
)
err_msg = job.get_errormsg()
err_lines = err_msg.splitlines()
max_lines = 30
if len(err_lines) > max_lines:
Expand Down
22 changes: 16 additions & 6 deletions interfaces/adfsuite/ams.py
Original file line number Diff line number Diff line change
Expand Up @@ -1802,8 +1802,8 @@ def get_density_along_axis(

start_step, end_step, every, _ = self._get_integer_start_end_every_max(start_fs, end_fs, every_fs, None)
nEntries = self.readrkf("History", "nEntries")
coords = np.array(self.get_history_property("Coords")).reshape(nEntries, -1, 3)
coords = coords[start_step:end_step:every]
history_coords = np.array(self.get_history_property("Coords")).reshape(nEntries, -1, 3)
coords = history_coords[start_step:end_step:every]
nEntries = len(coords)

axis2index = {"x": 0, "y": 1, "z": 2}
Expand Down Expand Up @@ -2483,10 +2483,14 @@ def get_errormsg(self) -> Optional[str]:
if self.check():
return None
else:
# Check if there is an error captured during the job process, or a previously cached error
if self._error_msg:
return self._error_msg

default_msg = "Could not determine error message. Please check the output manually."
msg = None
try:
# Something went wrong. The first place to check is the termination status on the ams.rkf.
# If not, the first place to check is the termination status on the ams.rkf.
# If the AMS driver stopped with a known error (called StopIt in the Fortran code), the error will be in there.
# Status can be:
# - NORMAL TERMINATION with errors: find the error from the ams log file
Expand All @@ -2503,7 +2507,8 @@ def get_errormsg(self) -> Optional[str]:
try:
log_err_lines = self.results.grep_file("ams.log", "ERROR: ")
if log_err_lines:
return log_err_lines[-1].partition("ERROR: ")[2]
self._error_msg: Optional[str] = log_err_lines[-1].partition("ERROR: ")[2]
return self._error_msg
except FileError:
pass

Expand All @@ -2517,7 +2522,8 @@ def get_errormsg(self) -> Optional[str]:
match=1,
)
if license_err_lines:
return str.join("\n", license_err_lines)
self._error_msg = str.join("\n", license_err_lines)
return self._error_msg
except FileError:
pass

Expand All @@ -2537,7 +2543,11 @@ def get_errormsg(self) -> Optional[str]:
break
except:
pass
return msg if msg else default_msg

# Cache error message if called again
self._error_msg = msg if msg else default_msg

return self._error_msg

def hash_input(self) -> str:
"""Calculate the hash of the input file.
Expand Down
4 changes: 2 additions & 2 deletions mol/molecule.py
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ def get_fragment(self, indices):

return ret

def get_complete_molecules_within_threshold(self, atom_indices, threshold: float):
def get_complete_molecules_within_threshold(self, atom_indices: List[int], threshold: float):
"""
Returns a new molecule containing complete submolecules for any molecules
that are closer than ``threshold`` to any of the atoms in ``atom_indices``.
Expand All @@ -1509,7 +1509,7 @@ def get_complete_molecules_within_threshold(self, atom_indices, threshold: float
D = distance_array(solvated_coords, solvated_coords)[zero_based_indices]
less_equal = np.less_equal(D, threshold)
within_threshold = np.any(less_equal, axis=0)
good_indices = [i for i, value in enumerate(within_threshold) if value]
good_indices = [i for i, value in enumerate(within_threshold) if value] # type: ignore

complete_indices: Set[int] = set()
for indlist in molecule_indices:
Expand Down
7 changes: 7 additions & 0 deletions unit_tests/test_amsjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ def test_get_errormsg_populates_correctly_for_different_scenarios(self):

# Invalid license
job = AMSJob()
job._error_msg = None
results = MagicMock(spec=AMSResults)
results.readrkf.side_effect = FileError()
results.grep_file.side_effect = FileError()
Expand All @@ -595,6 +596,7 @@ def test_get_errormsg_populates_correctly_for_different_scenarios(self):
)

# Invalid input
job._error_msg = None
results.grep_file.side_effect = None
results.grep_file.return_value = [
'<Dec05-2024> <12:03:49> ERROR: Input error: value "foo" found in line 13 for multiple choice key "Task" is not an allowed choice'
Expand All @@ -605,9 +607,14 @@ def test_get_errormsg_populates_correctly_for_different_scenarios(self):
)

# Error in calculation
job._error_msg = None
results.readrkf.return_value = "NORMAL TERMINATION with errors"
results.readrkf.side_effect = None
results.grep_file.return_value = [
"<Dec05-2024> <13:44:55> ERROR: Geometry optimization failed! (Did not converge.)"
]
assert job.get_errormsg() == "Geometry optimization failed! (Did not converge.)"

# Error in prerun
job._error_msg = "RuntimeError: something went wrong"
assert job.get_errormsg() == "RuntimeError: something went wrong"
Loading

0 comments on commit d8a857d

Please sign in to comment.