Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Database is not recovered when worker process throws an exception #1794

Open
zachmprince opened this issue Jul 23, 2024 · 2 comments
Open

Database is not recovered when worker process throws an exception #1794

zachmprince opened this issue Jul 23, 2024 · 2 comments
Labels
bug Something is wrong: Highest Priority

Comments

@zachmprince
Copy link
Contributor

Description

The database is written in the fast path (temporary directory) during a run. At the end of a run, the file is copied to the working directory. When an error is raised on the main process (context.MPI_RANK == 0), the database is copied back to the working directory. When an error is raised on a worker process, the database is not copied back to the working directory, and thus the file is lost.

The reason this happens is because the process that fails calls context.MPI_COMM.Abort(errorcode=-1), which forces all of the other processes to abort immediately. If the main process is the first to fail, it copies the database back before calling the abort via DatabaseInterface::interactError(), i.e., a "graceful" failure and exit. However, if the process that fails is not the main process, the main process will be forced to abort before getting a chance to copy the DB back to the working directory. This MPI_COMM.Abort is called by ARMI's __main__.py:

context.MPI_COMM.Abort(errorcode=-1)

Additionally, using the syncDbAfterWrite setting does not work. You can't copy an HDF5 file from one location to another while the file handle is open. This raises a PermissionError.

Reproduction of Problem

Below is a script that imposes an exception to be thrown during DatabaseInterface::interactEveryNode. I will name this file runDatabaseWriteOnFailure.py.

import os
import sys
from unittest import mock
import argparse
import shutil

import armi
from armi import context
from armi import runLog
from armi.__main__ import main as armi_main
from armi.bookkeeping.db.databaseInterface import DatabaseInterface
from armi.tests import TEST_ROOT
from armi.utils import pathTools, directoryChangers
from armi import settings

if not armi.isConfigured():
    armi.configure()
context.Mode.setMode(context.Mode.BATCH)


class FailingDatabaseInterface(DatabaseInterface):
    """
    Mock interface that throws an exception during `interactEveryNode` on a
    prescribed rank at time-node 1.
    """

    failingRank = 0

    def interactEveryNode(self, cycle, node):
        if node == 1:
            context.MPI_COMM.bcast("fail", root=0)
            self._fail()
        super().interactEveryNode(cycle, node)

    def workerOperate(self, cmd):
        if cmd == "fail":
            self._fail()
            return True
        return False

    def _fail(self):
        if self.failingRank == context.MPI_RANK:
            runLog.error(f"Failing interface on processor {context.MPI_RANK}")
            raise RuntimeError("Failing interface critical worker failure")


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-f",
        "--failingRank",
        type=int,
        default=0,
        help="Processor rank to throw exception on.",
    )
    args = parser.parse_args()

    # Move necessary files to new working directory
    workingDir = os.path.join(os.getcwd(), "failedDatabase")
    filesToMove = [
        "armiRun.yaml",
        "refSmallCoreGrid.yaml",
        "refSmallReactor.yaml",
        "refSmallReactorBase.yaml",
        "refSmallSfpGrid.yaml",
        "refSmallReactorShuffleLogic.py",
    ]
    pathTools.cleanPath(workingDir)
    if context.MPI_RANK == 0:
        os.makedirs(workingDir)
        for file in filesToMove:
            shutil.copy(os.path.join(TEST_ROOT, file), os.path.join(workingDir, file))

    # Change to working directory
    with directoryChangers.DirectoryChanger(workingDir, dumpOnException=False):
        # Create a new settings file with some modifications
        cs = settings.Settings("armiRun.yaml")
        newSettings = {
            "db": True,
            "nCycles": 1,
        }
        cs = cs.modified(newSettings=newSettings)
        cs.writeToYamlFile("failedDatabase.yaml")

        # Mock the DatabaseInterface with the one that will throw exception
        with mock.patch(
            "armi.bookkeeping.db.databaseInterface.DatabaseInterface",
            side_effect=FailingDatabaseInterface,
        ):
            # Prescribe the rank we want the interface to fail on
            FailingDatabaseInterface.failingRank = args.failingRank
            # Run ARMI main command as if it was done from command-line
            sys.argv = ["armi", "run", "failedDatabase.yaml"]
            armi_main()


if __name__ == "__main__":
    main()

To show the behavior, you run this script as such:

  1. Failing on main processor:

    mpiexec -n 2 python runDatabaseWriteOnFailure.py -f 0

    This will create an MPI_ABORT message and if you ls into failedDatabase/ you will see that failedDatabase.h5 has been written.

  2. Failing on worker processor:

    mpiexec -n 2 python runDatabaseWriteOnFailure.py -f 1

    You will see a similar MPI_ABORT message, but doing a ls into failedDatabase/ shows that the h5 file was not written.

Potential Solutions

To solve this issue, the failing worker needs send a message to the main processor that it is failing so the main processor can output the database before MPI_ABORT is called. However, the main processor need to know to receive this message at some point. We can't probe for the message at prescribed locations since there might be a bcast/gather/scatter before the receive command that causes the process to hang. With in mind, I've come up with following potential solutions.

Wrapper Around MPI_COMM

This solution involves having a wrapper class around context.MPI_COMM. Basically, when an exception is thrown on a worker process, it sends a designated message to the main process that it is failing. The wrapper probes for this messages before every bcast/gather/scatter and throws an exception if the message is received. This is extremely intrusive and not future-proof if someone wants to do a communication not supported by the wrapper.

Failure Check Around Troublesome Calls

Basically, it would look like this:

with context.CheckFailure():
    commonFailingCode()

Basically, this context manager will have a __exit__() method that sends/receives a message to/from main/worker processors whether commonFailingCode() has thrown an exception. Possibly something like this:

class CheckFailure:

    TAG_FAILURE = 93

    def __exit__(self, exception_type, exception_value, stacktrace):
        # Check to see if an exception occured
        err = any([exception_type, exception_value, stacktrace])
        if context.MPI_RANK == 0:
            # If the error occured on main process, then simply raise the exception
            if err:
                raise
            # Otherwise, see if any worker processors has thrown an exception
            else:
                status = 0
                for rank in range(1, context.MPI_COMM.size):
                    status += context.MPI_COMM.recv(source=rank, tag=self.TAG_FAILURE)
                if status > 0:
                    raise RuntimeError(
                        "Failure message has been received from a worker processor"
                    )
        else:
            if err:
                # Log the error
                runLog.error(
                    r"{}\n{}\{}".format(exception_type, exception_value, stacktrace)
                )
                # Tell main process to raise an exception
                context.MPI_COMM.send(1, dest=0, tag=self.TAG_FAILURE)
                # Gracefully terminate the process and rely on main process to call MPI_ABORT
                raise SystemExit
            else:
                # Otherwise, send message saying no error has occurred
                context.MPI_COMM.send(0, dest=0, tag=self.TAG_FAILURE)

I'm not positive this exactly will work as written here, but the idea is there. Essentially, the CheckFailure.__exit__() will send the main process a status message, and if the main process receives a non-zero message, it raises it's own exception, go through interactError and call MPI_ABORT itself. The failing worker will raise a SystemExit which will be handled by __main__.py to not call MPI_ABORT and just gracefully finalize the process. This solution is much less intrusive, but it requires developers to put in their own checks, or else it will go back to the original behavior. The major drawback is that if commonFailingCode() has it's own MPI calls and exception is thrown before them on a worker process, the run will hang. So developers need to be careful when inserting this into their code, making this a dangerous capability.

@zachmprince zachmprince added the bug Something is wrong: Highest Priority label Jul 23, 2024
@mgjarrett
Copy link
Contributor

mgjarrett commented Jul 23, 2024

Additionally, using the syncDbAfterWrite setting does not work. You can't copy an HDF5 file from one location to another while the file handle is open. This raises a PermissionError.

Another way to solve the core issue of "database isn't available when worker process fails" is to get the syncDbAfterWrite setting to work. This would enable us to avoid the complicated MPI communication outlined above that would be required for the worker to notify the root process when it fails.

To get syncDbAfterWrite to work, we would need to close the HDF5 file handle before copying it from fast path to working directory, and then re-open the file. This is very simple in terms of implementation, but it is a "brute-force" solution in terms of bandwidth -- we're copying a file from one location to another multiple times during a run, and that file might be a few GB.

However, if we are running on a platform that has a relatively high-bandwidth connection between the fast path and working directory, the syncDbAfterWrte solution would be attractive. We might expect on the order of 5-10 s for the database copy, which happens once per time node. This would be a negligible time penalty for a large, complex ARMI app that runs for anywhere from 10s of minutes to a few hours at each time node.

@john-science
Copy link
Member

@zachmprince What else needs to be done to close this ticket?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is wrong: Highest Priority
Projects
None yet
Development

No branches or pull requests

3 participants