Skip to content

Commit

Permalink
Merge pull request #135 from joezuntz/failure-log
Browse files Browse the repository at this point in the history
Add an option to log all failed parameter sets to a file
  • Loading branch information
joezuntz authored Jul 25, 2024
2 parents 4574989 + 5e19527 commit ae1b65c
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 2 deletions.
11 changes: 11 additions & 0 deletions cosmosis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,14 @@ def run_cosmosis(ini, pool=None, pipeline=None, values=None, priors=None, overri

number_samplers = len(sampler_classes)

if ini.has_option("pipeline", "failure_log"):
filename = ini.get("pipeline", "failure_log")
failure_log_file = mpi_pool.MPILogFile(filename, pool)
pipeline.set_failure_log_file(failure_log_file)
else:
failure_log_file = None



#To start with we do not have any estimates of
#anything the samplers might give us like centers
Expand Down Expand Up @@ -440,6 +448,9 @@ def run_cosmosis(ini, pool=None, pipeline=None, values=None, priors=None, overri
if cleanup_pipeline:
pipeline.cleanup()

if failure_log_file is not None:
failure_log_file.close()

if profile_cpu:
profile.disable()
if (pool is not None) and (not smp):
Expand Down
36 changes: 36 additions & 0 deletions cosmosis/runtime/mpi_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,39 @@ def __enter__(self):

def __exit__(self, *args):
self.close()

class MPILogFile:
def __init__(self, filename, pool=None):
if hasattr(pool, "comm"):
self.comm = pool.comm
elif pool is None:
self.comm = None
else:
raise ValueError("Failure logging from multiple processes does not yet work using multiprocessing with --smp, only MPI using --mpi")

self.filename = filename
if self.comm is None:
self.file = open(filename, "w")
else:
from mpi4py import MPI
self.MPI = MPI
self.file = MPI.File.Open(self.comm, filename, MPI.MODE_WRONLY | MPI.MODE_CREATE)

def write(self, message):
if self.comm is None:
self.file.write(message)
else:
message = message.encode("utf-8")
self.file.Write_shared(message)

def __del__(self):
if self.file is not None:
self.close()

def close(self):
if self.comm is None:
self.file.close()
else:
self.file.Close()
self.file = None

18 changes: 18 additions & 0 deletions cosmosis/runtime/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,23 @@ def __init__(self, arg=None, load=True, modules=None):
print("and use the cached results from the first run for everything before that.")
print("except the input parameter values. Think about this to check it's what you want.")
self.shortcut_module = index
# Add a log file where failures are sent
# We only set this is a method instead of an init variable
# because we may want to activate it on pipelines created
# by the user and sent to run_cosmosis
self.failure_log_file = None

def set_failure_log_file(self, failure_log_file):
"""
Set the file where the pipeline will log failures.
Parameters
----------
failure_log_file : file-like object
The file where the pipeline will log failures.
This can be a regular file or an MPILogFile object
"""
self.failure_log_file = failure_log_file

def find_module_file(self, path):
u"""Find a module file, which is assumed to be either absolute or relative to COSMOSIS_SRC_DIR"""
Expand Down Expand Up @@ -1113,6 +1128,9 @@ def run_parameters(self, p, check_ranges=False, all_params=False):
return data
else:
sys.stderr.write("Pipeline failed on these parameters: {}\n".format(p))
if self.failure_log_file is not None:
msg = " ".join([str(x) for x in p])
self.failure_log_file.write(msg + "\n")
return None


Expand Down
42 changes: 41 additions & 1 deletion cosmosis/test/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from cosmosis.samplers.sampler import Sampler
from cosmosis.runtime.prior import TruncatedGaussianPrior, DeltaFunctionPrior
from cosmosis.output.in_memory_output import InMemoryOutput
from cosmosis.main import run_cosmosis, parser
from cosmosis.main import run_cosmosis, parser, mpi_pool
import numpy as np
import os
import tempfile
Expand Down Expand Up @@ -265,6 +265,46 @@ def log_like(p):
assert len(r.extra) == 0


def test_failure_log():
def log_like(p):
if p[0] < 0:
print("Deliberately failing pipeline ...")
raise ValueError("p0 must be positive")
return -0.5 * np.sum(p**2)

param_ranges = [
(-3.0, 0.0, 3.0),
(-3.0, 0.0, 3.0),
(-3.0, 0.0, 3.0),
(-3.0, 0.0, 3.0),
]

with tempfile.TemporaryDirectory() as dirname:
logname = f"{dirname}/failure.log"
failure_log = mpi_pool.MPILogFile(logname)

pipeline = LikelihoodPipeline.from_likelihood_function(log_like, param_ranges)
pipeline.set_failure_log_file(failure_log)
# two that should work and produce nothing
# and two that should log some errors
param_sets = [
[0.0,0.0,0.0,0.0],
[1.0,0.0,0.0,0.0],
[-1.0,0.0,0.0,4.0],
[-2.0,0.0,0.0,8.0],
]
print("Pipeline errors are expected below - we are testing logging of them")
for p in param_sets:
pipeline.run_parameters(p)

failure_log.close()
with open(logname) as f:
lines = f.readlines()
assert len(lines) == 2
assert lines[0].strip() == "-1.0 0.0 0.0 4.0"
assert lines[1].strip() == "-2.0 0.0 0.0 8.0"



if __name__ == '__main__':
test_script_skip()
2 changes: 1 addition & 1 deletion cosmosis/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '3.10'
__version__ = '3.11'

0 comments on commit ae1b65c

Please sign in to comment.