diff --git a/cosmosis/main.py b/cosmosis/main.py index 2ffb0b29..b48e56ba 100755 --- a/cosmosis/main.py +++ b/cosmosis/main.py @@ -332,6 +332,14 @@ def run_cosmosis(ini, pool=None, pipeline=None, values=None, priors=None, overri number_samplers = len(sampler_classes) + if ini.has_option("pipeline", "failure_log"): + filename = ini.get("pipeline", "failure_log") + failure_log_file = mpi_pool.MPILogFile(filename, pool) + pipeline.set_failure_log_file(failure_log_file) + else: + failure_log_file = None + + #To start with we do not have any estimates of #anything the samplers might give us like centers @@ -440,6 +448,9 @@ def run_cosmosis(ini, pool=None, pipeline=None, values=None, priors=None, overri if cleanup_pipeline: pipeline.cleanup() + if failure_log_file is not None: + failure_log_file.close() + if profile_cpu: profile.disable() if (pool is not None) and (not smp): diff --git a/cosmosis/runtime/mpi_pool.py b/cosmosis/runtime/mpi_pool.py index 37bac24e..64ec276b 100644 --- a/cosmosis/runtime/mpi_pool.py +++ b/cosmosis/runtime/mpi_pool.py @@ -124,3 +124,39 @@ def __enter__(self): def __exit__(self, *args): self.close() + +class MPILogFile: + def __init__(self, filename, pool=None): + if hasattr(pool, "comm"): + self.comm = pool.comm + elif pool is None: + self.comm = None + else: + raise ValueError("Failure logging from multiple processes does not yet work using multiprocessing with --smp, only MPI using --mpi") + + self.filename = filename + if self.comm is None: + self.file = open(filename, "w") + else: + from mpi4py import MPI + self.MPI = MPI + self.file = MPI.File.Open(self.comm, filename, MPI.MODE_WRONLY | MPI.MODE_CREATE) + + def write(self, message): + if self.comm is None: + self.file.write(message) + else: + message = message.encode("utf-8") + self.file.Write_shared(message) + + def __del__(self): + if self.file is not None: + self.close() + + def close(self): + if self.comm is None: + self.file.close() + else: + self.file.Close() + self.file = None + diff --git a/cosmosis/runtime/pipeline.py b/cosmosis/runtime/pipeline.py index d040249e..6d5c4dc8 100644 --- a/cosmosis/runtime/pipeline.py +++ b/cosmosis/runtime/pipeline.py @@ -411,8 +411,23 @@ def __init__(self, arg=None, load=True, modules=None): print("and use the cached results from the first run for everything before that.") print("except the input parameter values. Think about this to check it's what you want.") self.shortcut_module = index + # Add a log file where failures are sent + # We only set this is a method instead of an init variable + # because we may want to activate it on pipelines created + # by the user and sent to run_cosmosis + self.failure_log_file = None + def set_failure_log_file(self, failure_log_file): + """ + Set the file where the pipeline will log failures. + Parameters + ---------- + failure_log_file : file-like object + The file where the pipeline will log failures. + This can be a regular file or an MPILogFile object + """ + self.failure_log_file = failure_log_file def find_module_file(self, path): u"""Find a module file, which is assumed to be either absolute or relative to COSMOSIS_SRC_DIR""" @@ -1113,6 +1128,9 @@ def run_parameters(self, p, check_ranges=False, all_params=False): return data else: sys.stderr.write("Pipeline failed on these parameters: {}\n".format(p)) + if self.failure_log_file is not None: + msg = " ".join([str(x) for x in p]) + self.failure_log_file.write(msg + "\n") return None diff --git a/cosmosis/test/test_pipeline.py b/cosmosis/test/test_pipeline.py index 47b5907c..20de5bf6 100644 --- a/cosmosis/test/test_pipeline.py +++ b/cosmosis/test/test_pipeline.py @@ -3,7 +3,7 @@ from cosmosis.samplers.sampler import Sampler from cosmosis.runtime.prior import TruncatedGaussianPrior, DeltaFunctionPrior from cosmosis.output.in_memory_output import InMemoryOutput -from cosmosis.main import run_cosmosis, parser +from cosmosis.main import run_cosmosis, parser, mpi_pool import numpy as np import os import tempfile @@ -265,6 +265,46 @@ def log_like(p): assert len(r.extra) == 0 +def test_failure_log(): + def log_like(p): + if p[0] < 0: + print("Deliberately failing pipeline ...") + raise ValueError("p0 must be positive") + return -0.5 * np.sum(p**2) + + param_ranges = [ + (-3.0, 0.0, 3.0), + (-3.0, 0.0, 3.0), + (-3.0, 0.0, 3.0), + (-3.0, 0.0, 3.0), + ] + + with tempfile.TemporaryDirectory() as dirname: + logname = f"{dirname}/failure.log" + failure_log = mpi_pool.MPILogFile(logname) + + pipeline = LikelihoodPipeline.from_likelihood_function(log_like, param_ranges) + pipeline.set_failure_log_file(failure_log) + # two that should work and produce nothing + # and two that should log some errors + param_sets = [ + [0.0,0.0,0.0,0.0], + [1.0,0.0,0.0,0.0], + [-1.0,0.0,0.0,4.0], + [-2.0,0.0,0.0,8.0], + ] + print("Pipeline errors are expected below - we are testing logging of them") + for p in param_sets: + pipeline.run_parameters(p) + + failure_log.close() + with open(logname) as f: + lines = f.readlines() + assert len(lines) == 2 + assert lines[0].strip() == "-1.0 0.0 0.0 4.0" + assert lines[1].strip() == "-2.0 0.0 0.0 8.0" + + if __name__ == '__main__': test_script_skip() diff --git a/cosmosis/version.py b/cosmosis/version.py index 4339557c..98482227 100644 --- a/cosmosis/version.py +++ b/cosmosis/version.py @@ -1 +1 @@ -__version__ = '3.10' +__version__ = '3.11'