From 3721662099ef1107a4c539a214e823bd9dc4f8a5 Mon Sep 17 00:00:00 2001 From: Jessica Pilling Date: Fri, 11 Aug 2023 18:04:47 +0100 Subject: [PATCH] moving xga.sas to xga.generate --- xga/generate/sas/__init__.py | 9 + xga/generate/sas/_common.py | 200 +++++++++ xga/generate/sas/misc.py | 93 ++++ xga/generate/sas/phot.py | 511 +++++++++++++++++++++ xga/generate/sas/run.py | 279 ++++++++++++ xga/generate/sas/spec.py | 849 +++++++++++++++++++++++++++++++++++ 6 files changed, 1941 insertions(+) create mode 100644 xga/generate/sas/__init__.py create mode 100644 xga/generate/sas/_common.py create mode 100644 xga/generate/sas/misc.py create mode 100644 xga/generate/sas/phot.py create mode 100644 xga/generate/sas/run.py create mode 100644 xga/generate/sas/spec.py diff --git a/xga/generate/sas/__init__.py b/xga/generate/sas/__init__.py new file mode 100644 index 00000000..3469eb96 --- /dev/null +++ b/xga/generate/sas/__init__.py @@ -0,0 +1,9 @@ +# This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS). +# Last modified by David J Turner (turne540@msu.edu) 09/05/2023, 11:33. Copyright (c) The Contributors + +from ._common import region_setup +from .phot import * +from .spec import * + + + diff --git a/xga/generate/sas/_common.py b/xga/generate/sas/_common.py new file mode 100644 index 00000000..bee45a41 --- /dev/null +++ b/xga/generate/sas/_common.py @@ -0,0 +1,200 @@ +# This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS). +# Last modified by David J Turner (turne540@msu.edu) 09/05/2023, 17:11. Copyright (c) The Contributors +import os.path +import warnings +from typing import Union, Tuple, List + +from astropy.units import Quantity + +from xga.exceptions import NotAssociatedError +from xga.samples import BaseSample +from xga.sources import BaseSource, NullSource, GalaxyCluster +from xga.utils import RAD_LABELS, OUTPUT + + +def region_setup(sources: Union[BaseSource, BaseSample], outer_radius: Union[str, Quantity], + inner_radius: Union[str, Quantity], disable_progress: bool, obs_id: str) \ + -> Tuple[Union[BaseSource, BaseSample], List[Quantity], List[Quantity]]: + """ + The preparation and value checking stage for SAS spectrum generation. + + :param BaseSource/BaseSample sources: A single source object, or a sample of sources. + :param str/Quantity outer_radius: The name or value of the outer radius to use for the generation of + the spectrum (for instance 'r200' would be acceptable for a GalaxyCluster, or Quantity(1000, 'kpc')). + :param str/Quantity inner_radius: The name or value of the inner radius to use for the generation of + the spectrum (for instance 'r500' would be acceptable for a GalaxyCluster, or Quantity(300, 'kpc')). By + default this is zero arcseconds, resulting in a circular spectrum. + :param bool disable_progress: Setting this to true will turn off the SAS generation progress bar. + :param str obs_id: Only used if the 'region' radius name is passed, the ObsID to retrieve the region for. + :return: The source objects, a list of inner radius quantities, and a list of outer radius quantities. + :rtype: Tuple[Union[BaseSource, BaseSample], List[Quantity], List[Quantity]] + """ + # NullSources are not allowed to have spectra, as they can have any observations associated and thus won't + # necessarily overlap + if isinstance(sources, NullSource): + raise TypeError("You cannot create spectra of a NullSource") + + if isinstance(sources, BaseSource): + sources = [sources] + + # Checking that the user hasn't passed BaseSources + if not all([type(src) != BaseSource for src in sources]): + raise TypeError("You cannot generate spectra from a BaseSource object, really you shouldn't be using " + "them at all, they are mostly useful as a superclass.") + + # Issuing a warning to the user that one or one sources have not been detected + if not all([src.detected for src in sources]): + warnings.warn("Not all of these sources have been detected, the spectra generated may not be helpful.") + + # Checking that inner radii that have been passed into the spectrum generation aren't nonsense + if isinstance(inner_radius, str) and inner_radius not in RAD_LABELS: + raise ValueError("You have passed a radius name rather than a value for 'inner_radius', but it is " + "not a valid name, please use one of the following:\n {}".format(", ".join(RAD_LABELS))) + + elif isinstance(inner_radius, str) and inner_radius in ["r2500", "r500", "r200"] and \ + not all([type(src) == GalaxyCluster for src in sources]): + raise TypeError("The {} radius is only valid for GalaxyCluster objects".format(inner_radius)) + + # One radius can be passed for a whole sample, but this checks to make sure that if there are multiple sources, + # and multiple radii have been passed, there are the same number of sources and radii + elif isinstance(inner_radius, Quantity) and len(sources) != 1 and not inner_radius.isscalar \ + and len(sources) != len(inner_radius): + raise ValueError("Your sample has {s} sources, but your inner_radius variable only has {i} entries. Please " + "pass only one inner_radius or the same number as there are " + "sources".format(s=len(sources), i=len(inner_radius))) + + # Checking that outer_radius radii that have been passed into the spectrum generation aren't nonsense + if isinstance(outer_radius, str) and outer_radius not in RAD_LABELS: + raise ValueError("You have passed a radius name rather than a value for 'outer_radius', but it is " + "not a valid name, please use one of the following:\n {}".format(", ".join(RAD_LABELS))) + elif isinstance(outer_radius, str) and outer_radius in ["r2500", "r500", "r200"] and \ + not all([type(src) == GalaxyCluster for src in sources]): + raise TypeError("The {} radius is only valid for GalaxyCluster objects".format(outer_radius)) + elif isinstance(outer_radius, Quantity) and len(sources) != 1 and not outer_radius.isscalar \ + and len(sources) != len(outer_radius): + raise ValueError("Your sample has {s} sources, but your outer_radius variable only has {o} entries. Please " + "pass only one outer_radius or the same number as there are " + "sources".format(s=len(sources), o=len(outer_radius))) + + # A crude way to store the radii but I'm tired and this will work fine + final_inner = [] + final_outer = [] + # I need to convert the radii to the same units and compare them, and to make sure they + # are actually in distance units. The distance unit checking is done by convert_radius + for s_ind, src in enumerate(sources): + # Converts the inner and outer radius for this source into the same unit + if isinstance(outer_radius, str) and outer_radius != 'region': + cur_out_rad = src.get_radius(outer_radius, 'deg') + elif isinstance(outer_radius, str) and outer_radius == 'region': + reg = src.source_back_regions('region', obs_id)[0] + cur_out_rad = Quantity([reg.width.to('deg').value/2, reg.height.to('deg').value/2], 'deg') + elif outer_radius.isscalar: + cur_out_rad = src.convert_radius(outer_radius, 'deg') + else: + cur_out_rad = src.convert_radius(outer_radius[s_ind], 'deg') + + # We need to check that the outer radius isn't region, because for region objects we ignore whatever + # inner radius has been passed and just set it 0 + if outer_radius == 'region': + cur_inn_rad = Quantity([0, 0], 'deg') + elif isinstance(inner_radius, str): + cur_inn_rad = src.get_radius(inner_radius, 'deg') + elif inner_radius.isscalar: + cur_inn_rad = src.convert_radius(inner_radius, 'deg') + else: + cur_inn_rad = src.convert_radius(inner_radius[s_ind], 'deg') + + # Then we can check to make sure that the outer radius is larger than the inner radius + if outer_radius != 'region' and cur_inn_rad >= cur_out_rad: + raise ValueError("The inner_radius of {s} is greater than or equal to the outer_radius".format(s=src.name)) + else: + final_inner.append(cur_inn_rad) + final_outer.append(cur_out_rad) + + return sources, final_inner, final_outer + + +def _gen_detmap_cmd(source: BaseSource, obs_id: str, inst: str, bin_size: int = 200) -> Tuple[str, str, str]: + """ + + :param BaseSource source: The source for which the parent method is generating ARFs for, and that needs + a detector map. + :param str obs_id: The ObsID of the data we are generating ARFs for. + :param str inst: The instrument of the data we are generating ARFs for. NOTE - ideally this instrument WILL NOT + be used for the detector map, as it is beneficial to source a detector map from a different instrument to + the one you are generating ARFs for. + :param int bin_size: The x and y binning that should be applied to the image. Larger numbers will cause ARF + generation to be faster, but arguably the results will be less accurate. + :return: The command to generate the requested detector map (will be blank if the detector map already + exists), the path where the detmap will be after the command is run (i.e. the ObsID directory if it was + already generated, or the temporary directory if it has just been generated), and the final output path + of the detector. + :rtype: Tuple[str, str, str] + """ + # This is the command that will be filled out to generate the detmap of our dreams! + detmap_cmd = "evselect table={e} imageset={d} xcolumn=DETX ycolumn=DETY imagebinning=binSize ximagebinsize={bs} " \ + "yimagebinsize={bs} {ex}" + + # Some settings depend on the instrument, we use different patterns for different instruments + if "pn" in inst: + # Also the upper channel limit is different for EPN and EMOS detectors + spec_lim = 20479 + # This is an expression without region information to be used for making the detmaps + # required for ARF generation, we start off assuming we'll use a MOS observation as the detmap + d_expr = "expression='#XMMEA_EM && (PATTERN <= 12) && (FLAG .eq. 0)'" + + # The detmap for the arfgen call should ideally not be from the same instrument as the observation, + # so for PN we preferentially select MOS2 (as MOS1 was damaged). However if there isn't a MOS2 + # events list from the same observation then we select MOS1, and failing that we use PN. + try: + detmap_evts = source.get_products("events", obs_id=obs_id, inst='mos2')[0] + except NotAssociatedError: + try: + detmap_evts = source.get_products("events", obs_id=obs_id, inst='mos1')[0] + except NotAssociatedError: + detmap_evts = source.get_products("events", obs_id=obs_id, inst='pn')[0] + # If all is lost and there are no MOS event lists then we must revert to the PN expression + d_expr = "expression='#XMMEA_EP && (PATTERN <= 4) && (FLAG .eq. 0)'" + + elif "mos" in inst: + spec_lim = 11999 + # This is an expression without region information to be used for making the detmaps + # required for ARF generation, we start off assuming we'll use the PN observation as the detmap + d_expr = "expression='#XMMEA_EP && (PATTERN <= 4) && (FLAG .eq. 0)'" + + # The detmap for the arfgen call should ideally not be from the same instrument as the observation, + # so for MOS observations we preferentially select PN. However if there isn't a PN events list + # from the same observation then for MOS2 we select MOS1, and for MOS1 we select MOS2 (as they + # are rotated wrt one another it is still semi-valid), and failing that MOS2 will get MOS2 and MOS1 + # will get MOS1. + if inst[-1] == 1: + cur = '1' + opp = '2' + else: + cur = '2' + opp = '1' + try: + detmap_evts = source.get_products("events", obs_id=obs_id, inst='pn')[0] + except NotAssociatedError: + # If we must use a MOS detmap then we have to use the MOS expression + d_expr = "expression='#XMMEA_EM && (PATTERN <= 12) && (FLAG .eq. 0)'" + try: + detmap_evts = source.get_products("events", obs_id=obs_id, inst='mos' + opp)[0] + except NotAssociatedError: + detmap_evts = source.get_products("events", obs_id=obs_id, inst='mos' + cur)[0] + else: + raise ValueError("You somehow have an illegal value for the instrument name...") + + det_map = "{o}_{i}_bin{bs}_detmap.fits".format(o=detmap_evts.obs_id, i=detmap_evts.instrument, bs=bin_size) + det_map_path = os.path.join(OUTPUT, obs_id, det_map) + + # If the detmap that we've decided we need already exists, then we don't need to generate it again + if os.path.exists(det_map_path): + d_cmd_str = "" + det_map_cmd_path = det_map_path + else: + # Does the same thing for the evselect command to make the detmap + d_cmd_str = detmap_cmd.format(e=detmap_evts.path, d=det_map, ex=d_expr, bs=bin_size) + "; " + det_map_cmd_path = det_map + + return d_cmd_str, det_map_cmd_path, det_map_path diff --git a/xga/generate/sas/misc.py b/xga/generate/sas/misc.py new file mode 100644 index 00000000..739bdf67 --- /dev/null +++ b/xga/generate/sas/misc.py @@ -0,0 +1,93 @@ +# This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS). +# Last modified by David J Turner (turne540@msu.edu) 20/02/2023, 14:04. Copyright (c) The Contributors + +import os +from random import randint +from typing import Union + +import numpy as np +from fitsio import read_header + +from .run import sas_call +from .. import OUTPUT, NUM_CORES +from ..exceptions import InvalidProductError +from ..samples.base import BaseSample +from ..sources import BaseSource +from ..sources.base import NullSource + + +@sas_call +def cifbuild(sources: Union[BaseSource, NullSource, BaseSample], num_cores: int = NUM_CORES, + disable_progress: bool = False): + """ + A wrapper for the XMM cifbuild command, which will be run before many of the more complex SAS commands, to + check that a CIF compatible with the local version of SAS is available. The observation date is taken from an + event list for a given ObsID, and the analysis date is set to the date which this function is run. + + :param BaseSource/NullSource/BaseSample sources: A single source object, or a sample of sources. + :param int num_cores: The number of cores to use (if running locally), default is set to + 90% of available. + :param bool disable_progress: Setting this to true will turn off the SAS generation progress bar. + """ + # This function supports passing both individual sources and sets of sources + if isinstance(sources, (BaseSource, NullSource)): + sources = [sources] + + # This string contains the bash code to run cifbuild + cif_cmd = "cd {d}; cifbuild calindexset=ccf.cif withobservationdate=yes " \ + "observationdate={od} ; mv * ../; cd ..; rm -r {n}" + + sources_cmds = [] + sources_paths = [] + sources_extras = [] + sources_types = [] + for source in sources: + cmds = [] + final_paths = [] + extra_info = [] + for obs_id in source.obs_ids: + # Fetch an events list for this ObsID, doesn't matter which + some_evt_lists = source.get_products("events", obs_id=obs_id) + obs_date = None + for evt in some_evt_lists: + # Reads in the header of the events list file + evt_head = read_header(evt.path) + # Then extracts the observation date, this is what we need to give cifbuild + if "DATE-OBS" in evt_head: + obs_date = evt_head['DATE-OBS'] + del evt_head + break + else: + del evt_head + + if obs_date is None: + raise InvalidProductError("All event lists for {} are missing the DATE-OBS header, this is required to" + " run the cifbuild function.".format(obs_id)) + + if not os.path.exists(OUTPUT + obs_id): + os.mkdir(OUTPUT + obs_id) + + dest_dir = "{out}{obs}/".format(out=OUTPUT, obs=obs_id) + temp_name = "tempdir_{}".format(randint(0, 1e+8)) + temp_dir = dest_dir + temp_name + "/" + + final_path = dest_dir + "ccf.cif" + if not os.path.exists(final_path): + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + cmds.append(cif_cmd.format(d=temp_dir, od=obs_date, n=temp_name)) + final_paths.append(final_path) + extra_info.append({}) # This doesn't need any extra information + + sources_cmds.append(np.array(cmds)) + sources_paths.append(np.array(final_paths)) + sources_extras.append(np.array(extra_info)) + sources_types.append(np.full(sources_cmds[-1].shape, fill_value="ccf")) + + stack = False # This tells the sas_call routine that this command won't be part of a stack + execute = True # This should be executed immediately + + return sources_cmds, stack, execute, num_cores, sources_types, sources_paths, sources_extras, disable_progress + + + diff --git a/xga/generate/sas/phot.py b/xga/generate/sas/phot.py new file mode 100644 index 00000000..6da81da5 --- /dev/null +++ b/xga/generate/sas/phot.py @@ -0,0 +1,511 @@ +# This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS). +# Last modified by David J Turner (turne540@msu.edu) 03/06/2023, 14:53. Copyright (c) The Contributors + +import os +from random import randint +from shutil import rmtree +from typing import Union + +import numpy as np +from astropy.units import Quantity, deg +from tqdm import tqdm + +from .misc import cifbuild +from .run import sas_call +from .. import OUTPUT, NUM_CORES +from ..exceptions import SASInputInvalid, NoProductAvailableError +from ..imagetools import data_limits +from ..samples.base import BaseSample +from ..sources import BaseSource +from ..sources.base import NullSource +from ..utils import energy_to_channel + + +# TODO Perhaps remove the option to add to the SAS expression +@sas_call +def evselect_image(sources: Union[BaseSource, NullSource, BaseSample], lo_en: Quantity = Quantity(0.5, 'keV'), + hi_en: Quantity = Quantity(2.0, 'keV'), add_expr: str = "", num_cores: int = NUM_CORES, + disable_progress: bool = False): + """ + A convenient Python wrapper for a configuration of the SAS evselect command that makes images. + Images will be generated for every observation associated with every source passed to this function. + If images in the requested energy band are already associated with the source, + they will not be generated again. + + :param BaseSource/NullSource/BaseSample sources: A single source object, or a sample of sources. + :param Quantity lo_en: The lower energy limit for the image, in astropy energy units. + :param Quantity hi_en: The upper energy limit for the image, in astropy energy units. + :param str add_expr: A string to be added to the SAS expression keyword + :param int num_cores: The number of cores to use, default is set to 90% of available. + :param bool disable_progress: Setting this to true will turn off the SAS generation progress bar. + """ + stack = False # This tells the sas_call routine that this command won't be part of a stack + execute = True # This should be executed immediately + # This function supports passing both individual sources and sets of sources + if isinstance(sources, (BaseSource, NullSource)): + sources = [sources] + + # Don't do much value checking in this module, but this one is so fundamental that I will do it + if lo_en > hi_en: + raise ValueError("lo_en cannot be greater than hi_en") + else: + # Calls a useful little function that takes an astropy energy quantity to the XMM channels + # required by SAS commands + lo_chan = energy_to_channel(lo_en) + hi_chan = energy_to_channel(hi_en) + + expr = " && ".join([e for e in ["expression='(PI in [{l}:{u}])".format(l=lo_chan, u=hi_chan), + add_expr] if e != ""]) + "'" + # These lists are to contain the lists of commands/paths/etc for each of the individual sources passed + # to this function + sources_cmds = [] + sources_paths = [] + sources_extras = [] + sources_types = [] + for source in sources: + cmds = [] + final_paths = [] + extra_info = [] + # Check which event lists are associated with each individual source + for pack in source.get_products("events", just_obj=False): + obs_id = pack[0] + inst = pack[1] + + if not os.path.exists(OUTPUT + obs_id): + os.mkdir(OUTPUT + obs_id) + + en_id = "bound_{l}-{u}".format(l=lo_en.value, u=hi_en.value) + exists = [match for match in source.get_products("image", obs_id, inst, just_obj=False) + if en_id in match] + if len(exists) == 1 and exists[0][-1].usable: + continue + + evt_list = pack[-1] + dest_dir = OUTPUT + "{o}/{i}_{l}-{u}_{n}_temp/".format(o=obs_id, i=inst, l=lo_en.value, u=hi_en.value, + n=source.name) + im = "{o}_{i}_{l}-{u}keVimg.fits".format(o=obs_id, i=inst, l=lo_en.value, u=hi_en.value) + + # If something got interrupted and the temp directory still exists, this will remove it + if os.path.exists(dest_dir): + rmtree(dest_dir) + + os.makedirs(dest_dir) + cmds.append("cd {d};evselect table={e} imageset={i} xcolumn=X ycolumn=Y ximagebinsize=87 " + "yimagebinsize=87 squarepixels=yes ximagesize=512 yimagesize=512 imagebinning=binSize " + "ximagemin=3649 ximagemax=48106 withxranges=yes yimagemin=3649 yimagemax=48106 " + "withyranges=yes {ex}; mv * ../; cd ..; rm -r {d}".format(d=dest_dir, e=evt_list.path, + i=im, ex=expr)) + + # This is the products final resting place, if it exists at the end of this command + final_paths.append(os.path.join(OUTPUT, obs_id, im)) + extra_info.append({"lo_en": lo_en, "hi_en": hi_en, "obs_id": obs_id, "instrument": inst}) + sources_cmds.append(np.array(cmds)) + sources_paths.append(np.array(final_paths)) + # This contains any other information that will be needed to instantiate the class + # once the SAS cmd has run + sources_extras.append(np.array(extra_info)) + sources_types.append(np.full(sources_cmds[-1].shape, fill_value="image")) + + # I only return num_cores here so it has a reason to be passed to this function, really + # it could just be picked up in the decorator. + return sources_cmds, stack, execute, num_cores, sources_types, sources_paths, sources_extras, disable_progress + + +@sas_call +def eexpmap(sources: Union[BaseSource, NullSource, BaseSample], lo_en: Quantity = Quantity(0.5, 'keV'), + hi_en: Quantity = Quantity(2.0, 'keV'), num_cores: int = NUM_CORES, disable_progress: bool = False): + """ + A convenient Python wrapper for the SAS eexpmap command. + Expmaps will be generated for every observation associated with every source passed to this function. + If expmaps in the requested energy band are already associated with the source, + they will not be generated again. + + :param BaseSource/NullSource/BaseSample sources: A single source object, or sample of sources. + :param Quantity lo_en: The lower energy limit for the expmap, in astropy energy units. + :param Quantity hi_en: The upper energy limit for the expmap, in astropy energy units. + :param int num_cores: The number of cores to use (if running locally), default is set to + 90% of available. + :param bool disable_progress: Setting this to true will turn off the SAS generation progress bar. + """ + # I know that a lot of this code is the same as the evselect_image code, but its 1am so please don't + # judge me too much. + + # This function supports passing both individual sources and sets of sources + if isinstance(sources, (BaseSource, NullSource)): + sources = [sources] + + # Don't do much value checking in this module, but this one is so fundamental that I will do it + if lo_en > hi_en: + raise ValueError("lo_en cannot be greater than hi_en") + else: + # Calls a useful little function that takes an astropy energy quantity to the XMM channels + # required by SAS commands + lo_chan = energy_to_channel(lo_en) + hi_chan = energy_to_channel(hi_en) + + # These are crucial, to generate an exposure map one must have a ccf.cif calibration file, and a reference + # image. If they do not already exist, these commands should generate them. + cifbuild(sources, disable_progress=disable_progress, num_cores=num_cores) + sources = evselect_image(sources, lo_en, hi_en) + # This is necessary because the decorator will reduce a one element list of source objects to a single + # source object. Useful for the user, not so much here where the code expects an iterable. + if not isinstance(sources, (list, BaseSample)): + sources = [sources] + + # These lists are to contain the lists of commands/paths/etc for each of the individual sources passed + # to this function + sources_cmds = [] + sources_paths = [] + sources_extras = [] + sources_types = [] + for source in sources: + cmds = [] + final_paths = [] + extra_info = [] + # Check which event lists are associated with each individual source + for pack in source.get_products("events", just_obj=False): + obs_id = pack[0] + inst = pack[1] + + if not os.path.exists(OUTPUT + obs_id): + os.mkdir(OUTPUT + obs_id) + + en_id = "bound_{l}-{u}".format(l=lo_en.value, u=hi_en.value) + exists = [match for match in source.get_products("expmap", obs_id, inst, just_obj=False) + if en_id in match] + if len(exists) == 1 and exists[0][-1].usable: + continue + # Generating an exposure map requires a reference image. + ref_im = [match for match in source.get_products("image", obs_id, inst, just_obj=False) + if en_id in match][0][-1] + # It also requires an attitude file + att = source.get_att_file(obs_id) + # Set up the paths and names of files + evt_list = pack[-1] + dest_dir = OUTPUT + "{o}/{i}_{l}-{u}_{n}_temp/".format(o=obs_id, i=inst, l=lo_en.value, u=hi_en.value, + n=source.name) + exp_map = "{o}_{i}_{l}-{u}keVexpmap.fits".format(o=obs_id, i=inst, l=lo_en.value, u=hi_en.value) + + # If something got interrupted and the temp directory still exists, this will remove it + if os.path.exists(dest_dir): + rmtree(dest_dir) + + os.makedirs(dest_dir) + cmds.append("cd {d}; cp ../ccf.cif .; export SAS_CCF={ccf}; eexpmap eventset={e} " + "imageset={im} expimageset={eim} withdetcoords=no withvignetting=yes " + "attitudeset={att} pimin={l} pimax={u}; mv * ../; cd ..; " + "rm -r {d}".format(e=evt_list.path, im=ref_im.path, eim=exp_map, att=att, l=lo_chan, + u=hi_chan, d=dest_dir, ccf=dest_dir + "ccf.cif")) + + # This is the products final resting place, if it exists at the end of this command + final_paths.append(os.path.join(OUTPUT, obs_id, exp_map)) + extra_info.append({"lo_en": lo_en, "hi_en": hi_en, "obs_id": obs_id, "instrument": inst}) + sources_cmds.append(np.array(cmds)) + sources_paths.append(np.array(final_paths)) + # This contains any other information that will be needed to instantiate the class + # once the SAS cmd has run + sources_extras.append(np.array(extra_info)) + sources_types.append(np.full(sources_cmds[-1].shape, fill_value="expmap")) + + stack = False # This tells the sas_call routine that this command won't be part of a stack + execute = True # This should be executed immediately + # I only return num_cores here so it has a reason to be passed to this function, really + # it could just be picked up in the decorator. + return sources_cmds, stack, execute, num_cores, sources_types, sources_paths, sources_extras, disable_progress + + +@sas_call +def emosaic(sources: Union[BaseSource, BaseSample], to_mosaic: str, lo_en: Quantity = Quantity(0.5, 'keV'), + hi_en: Quantity = Quantity(2.0, 'keV'), psf_corr: bool = False, psf_model: str = "ELLBETA", + psf_bins: int = 4, psf_algo: str = "rl", psf_iter: int = 15, num_cores: int = NUM_CORES, + disable_progress: bool = False): + """ + A convenient Python wrapper for the SAS emosaic command. Every image associated with the source, + that is in the energy band specified by the user, will be added together. + + :param BaseSource/BaseSample sources: A single source object, or a sample of sources. + :param str to_mosaic: The data type to produce a mosaic for, can be either image or expmap. + :param Quantity lo_en: The lower energy limit for the combined image, in astropy energy units. + :param Quantity hi_en: The upper energy limit for the combined image, in astropy energy units. + :param bool psf_corr: If True, PSF corrected images will be mosaiced. + :param str psf_model: If PSF corrected, the PSF model used. + :param int psf_bins: If PSF corrected, the number of bins per side. + :param str psf_algo: If PSF corrected, the algorithm used. + :param int psf_iter: If PSF corrected, the number of algorithm iterations. + :param int num_cores: The number of cores to use (if running locally), default is set to + 90% of available. + :param bool disable_progress: Setting this to true will turn off the SAS generation progress bar. + """ + # This function supports passing both individual sources and sets of sources + if isinstance(sources, BaseSource): + sources = [sources] + + # NullSources are not allowed to be mosaiced, as they can have any observations associated and thus won't + # necessarily overlap + if isinstance(sources, NullSource): + raise TypeError("You cannot create combined images of a NullSource") + + if to_mosaic not in ["image", "expmap"]: + raise ValueError("The only valid choices for to_mosaic are image and expmap.") + # Don't do much value checking in this module, but this one is so fundamental that I will do it + elif lo_en > hi_en: + raise ValueError("lo_en cannot be greater than hi_en") + + # To make a mosaic we need to have the individual products in the first place + if to_mosaic == "image": + sources = evselect_image(sources, lo_en, hi_en, disable_progress=disable_progress, num_cores=num_cores) + for_name = "img" + elif to_mosaic == "expmap": + sources = eexpmap(sources, lo_en, hi_en, disable_progress=disable_progress, num_cores=num_cores) + for_name = "expmap" + + # This is necessary because the decorator will reduce a one element list of source objects to a single + # source object. Useful for the user, not so much here where the code expects an iterable. + if not isinstance(sources, (list, BaseSample)): + sources = [sources] + + # The bit on the end takes everything up out of the temporary folder and removes it + mosaic_cmd = "cd {d}; emosaic imagesets='{ims}' mosaicedset={mim}; mv * ../; cd ..; rm -r {d}" + + sources_cmds = [] + sources_paths = [] + sources_extras = [] + sources_types = [] + for source in sources: + en_id = "bound_{l}-{u}".format(l=lo_en.value, u=hi_en.value) + # If we're mosaicing PSF corrected images, we need to + if psf_corr and to_mosaic == "expmap": + raise ValueError("There can be no PSF corrected expmaps to mosaic, it doesn't make sense.") + elif psf_corr: + en_id += "_" + psf_model + "_" + str(psf_bins) + "_" + psf_algo + str(psf_iter) + + # Checking if the combined product already exists + exists = [match for match in source.get_products("combined_{}".format(to_mosaic), just_obj=False) + if en_id in match] + if len(exists) == 1 and exists[0][-1].usable: + sources_cmds.append(np.array([])) + sources_paths.append(np.array([])) + sources_extras.append(np.array([])) + sources_types.append(np.array([])) + continue + + # This fetches all image objects with the passed energy bounds + matches = [[match[0], match[-1]] for match in source.get_products(to_mosaic, just_obj=False) + if en_id in match] + paths = [product[1].path for product in matches if product[1].usable] + obs_ids = [product[0] for product in matches if product[1].usable] + obs_ids_set = [] + for obs_id in obs_ids: + if obs_id not in obs_ids_set: + obs_ids_set.append(obs_id) + + if not os.path.exists(OUTPUT + obs_id): + os.mkdir(OUTPUT + obs_id) + + # The files produced by this function will now be stored in the combined directory. + final_dest_dir = OUTPUT + "combined/" + rand_ident = randint(0, 1e+8) + # Makes absolutely sure that the random integer hasn't already been used + while len([f for f in os.listdir(final_dest_dir) if str(rand_ident) in f.split(OUTPUT+"combined/")[-1]]) != 0: + rand_ident = randint(0, 1e+8) + + dest_dir = os.path.join(final_dest_dir, "temp_emosaic_{}".format(rand_ident)) + os.mkdir(dest_dir) + + # The name of the file used to contain all the ObsIDs that went into the stacked image/expmap. However + # this caused problems when too many ObsIDs were present and the filename was longer than allowed. So + # now I use the random identity I generated, and store the ObsID/instrument information in the inventory + # file + if not psf_corr: + mosaic = "{os}_{l}-{u}keVmerged_{t}.fits".format(os=rand_ident, l=lo_en.value, u=hi_en.value, t=for_name) + else: + mosaic = "{os}_{b}bin_{it}iter_{m}mod_{a}algo_{l}-{u}keVpsfcorr_merged_img." \ + "fits".format(os=rand_ident, l=lo_en.value, u=hi_en.value, b=psf_bins, it=psf_iter, a=psf_algo, + m=psf_model) + + sources_cmds.append(np.array([mosaic_cmd.format(ims=" ".join(paths), mim=mosaic, d=dest_dir)])) + sources_paths.append(np.array([os.path.join(final_dest_dir, mosaic)])) + # This contains any other information that will be needed to instantiate the class + # once the SAS cmd has run + # The 'combined' values for obs and inst here are crucial, they will tell the source object that the final + # product is assigned to that these are merged products - combinations of all available data + sources_extras.append(np.array([{"lo_en": lo_en, "hi_en": hi_en, "obs_id": "combined", + "instrument": "combined", "psf_corr": psf_corr, "psf_algo": psf_algo, + "psf_model": psf_model, "psf_iter": psf_iter, "psf_bins": psf_bins}])) + sources_types.append(np.full(sources_cmds[-1].shape, fill_value=to_mosaic)) + + stack = False # This tells the sas_call routine that this command won't be part of a stack + execute = True # This should be executed immediately + # I only return num_cores here so it has a reason to be passed to this function, really + # it could just be picked up in the decorator. + return sources_cmds, stack, execute, num_cores, sources_types, sources_paths, sources_extras, disable_progress + + +@sas_call +def psfgen(sources: Union[BaseSource, BaseSample], bins: int = 4, psf_model: str = "ELLBETA", + num_cores: int = NUM_CORES, disable_progress: bool = False): + """ + A wrapper for the psfgen SAS task. Used to generate XGA PSF objects, which in turn can be used to correct + XGA images/ratemaps for optical effects. By default we use the ELLBETA model reported in Read et al. 2011 + (doi:10.1051/0004-6361/201117525), and generate a grid of binsxbins PSFs that can be used + to correct for the PSF over an entire image. The energy dependence of the PSF is assumed to be minimal, and the + resultant PSF object will be paired up with an image that matches it's ObsID and instrument. + + :param BaseSource/BaseSample sources: A single source object, or a sample of sources. + :param int bins: The image coordinate space will be divided into a grid of size binsxbins, PSFs will be + generated at the central coordinates of the grid chunks. + :param str psf_model: Which model to use when generating the PSF, default is ELLBETA, the best available. + :param int num_cores: The number of cores to use (if running locally), default is set to + 90% of available. + :param bool disable_progress: Setting this to true will turn off the SAS generation progress bar. + """ + stack = False # This tells the sas_call routine that this command won't be part of a stack + execute = True # This should be executed immediately + + psf_model = psf_model.upper() + allowed_models = ["ELLBETA", "LOW", "MEDIUM", "EXTENDED", "HIGH"] + if psf_model not in allowed_models: + raise SASInputInvalid("{0} is not a valid PSF model. Allowed models are " + "{1}".format(psf_model, ", ".join(allowed_models))) + + # Need a valid CIF for this task, so run cifbuild first + cifbuild(sources, disable_progress=disable_progress, num_cores=num_cores) + + # This is necessary because the decorator will reduce a one element list of source objects to a single + # source object. Useful for the user, not so much here where the code expects an iterable. + if not isinstance(sources, (list, BaseSample)): + sources = [sources] + + # NullSources are not allowed to be mosaiced, as they can have any observations associated and thus won't + # necessarily overlap + if isinstance(sources, NullSource): + raise NotImplementedError("You cannot currently use PSFGen with a NullSource.") + + with tqdm(desc='Preparing PSF generation commands', total=len(sources), + disable=len(sources) == 0) as psfgen_prep_progress: + # These lists are to contain the lists of commands/paths/etc for each of the individual sources passed + # to this function + sources_cmds = [] + sources_paths = [] + sources_extras = [] + sources_types = [] + for source in sources: + cmds = [] + final_paths = [] + extra_info = [] + # Check which event lists are associated with each individual source + for pack in source.get_products("events", just_obj=False): + obs_id = pack[0] + inst = pack[1] + + if not os.path.exists(OUTPUT + obs_id): + os.mkdir(OUTPUT + obs_id) + + # This looks for any image for this ObsID, instrument combo - it does assume that whatever + # it finds will be the same resolution as any images in other energy bands that XGA will + # create in the future. + images = source.get_products("image", obs_id, inst, just_obj=True) + + if len(images) == 0: + raise NoProductAvailableError("There is no image available for {o} {i}, please generate " + "images before PSFs".format(o=obs_id, i=inst)) + + # Checking if the Image products are the same shape that XGA makes + res_match = [im for im in images if im.shape == (512, 512)] + if len(res_match) == 0: + raise NoProductAvailableError("There is an image associated with {o} {i}, but it doesn't" + " appear to be at the resolution XGA uses - this is not " + "supported yet.") + else: + image = res_match[0] + + # Here we try and find if this PSF configuration has already been run and has been + # associated with the source. If so then don't do it again. + psfs = source.get_products("psf", obs_id, inst, extra_key=psf_model + "_" + str(bins)) + if len(psfs) != 0: + continue + + # This part is where we decide on the RA DEC coordinates for the centres of each + # PSF in our grid + # This function gives us x and y limits for where there is data in an image, they are used as start + # and end coordinates for our bins so the PSFs are more focused on where there is actually data. + x_lims, y_lims = data_limits(image) + # Simple calculation to calculate step size in pixels, so how long each chunk will be in + # x and y directions + x_step = (x_lims[1] - x_lims[0]) / bins + y_step = (y_lims[1] - y_lims[0]) / bins + + # These are the x and y bin centre coordinates - when converted to RA and DEC this is where the + # PSF is generated at. + x_cen_coords = np.arange(*x_lims, x_step) + (x_step / 2) + y_cen_coords = np.arange(*y_lims, y_step) + (y_step / 2) + + # Get all combinations of the central coordinates using meshgrid, then turn them into + # an N row, 2 column numpy array of pixel coordinates for easy conversion to RA-DEC. + pix_mesh = np.meshgrid(x_cen_coords, y_cen_coords) + pix_coords = Quantity(np.stack([pix_mesh[0].ravel(), pix_mesh[1].ravel()]).T, 'pix') + + # But I also want to know the boundaries of the bins so I can easily select which parts of + # the image belong with each PSF in the grid + x_boundaries = np.linspace(*x_lims, bins+1) + y_boundaries = np.linspace(*y_lims, bins+1) + + # These two arrays give the x and y boundaries of the bins in the same order as the pix_coords array + x_bound_coords = np.tile(np.stack([x_boundaries[0: -1].ravel(), x_boundaries[1:].ravel()]).T, + (bins, 1)) + x_bound_coords = x_bound_coords.round(0).astype(int) + + y_bound_coords = np.repeat(np.stack([y_boundaries[0: -1].ravel(), y_boundaries[1:].ravel()]).T, + bins, 0) + y_bound_coords = y_bound_coords.round(0).astype(int) + + ra_dec_coords = image.coord_conv(pix_coords, deg) + + dest_dir = OUTPUT + "{o}/{i}_{n}_temp/".format(o=obs_id, i=inst, n=source.name) + psf = "{o}_{i}_{b}bin_{m}mod_{ra}_{dec}_psf.fits" + + # The change directory and SAS setup commands + init_cmd = "cd {d}; cp ../ccf.cif .; export SAS_CCF={ccf}; ".format(d=dest_dir, + ccf=dest_dir + "ccf.cif") + + # If something got interrupted and the temp directory still exists, this will remove it + if os.path.exists(dest_dir): + rmtree(dest_dir) + + os.makedirs(dest_dir) + + psf_files = [] + total_cmd = init_cmd + for pair_ind in range(ra_dec_coords.shape[0]): + # The coordinates at which this PSF will be generated + ra, dec = ra_dec_coords[pair_ind, :].value + + psf_file = psf.format(o=obs_id, i=inst, b=bins, ra=ra, dec=dec, m=psf_model) + psf_files.append(os.path.join(OUTPUT, obs_id, psf_file)) + # Going with xsize and ysize as 400 pixels, I think its enough and quite a bit faster than 1000 + total_cmd += "psfgen image={i} coordtype=EQPOS level={m} energy=1000 xsize=400 ysize=400 x={ra} " \ + "y={dec} output={p}; ".format(i=image.path, m=psf_model, ra=ra, dec=dec, p=psf_file) + + total_cmd += "mv * ../; cd ..; rm -r {d}".format(d=dest_dir) + cmds.append(total_cmd) + # This is the products final resting place, if it exists at the end of this command + # In this case it just checks for the final PSF in the grid, all other files in the grid + # get stored in extra info. + final_paths.append(os.path.join(OUTPUT, obs_id, psf_file)) + extra_info.append({"obs_id": obs_id, "instrument": inst, "model": psf_model, "chunks_per_side": bins, + "files": psf_files, "x_bounds": x_bound_coords, "y_bounds": y_bound_coords}) + + sources_cmds.append(np.array(cmds)) + sources_paths.append(np.array(final_paths)) + # This contains any other information that will be needed to instantiate the class + # once the SAS cmd has run + sources_extras.append(np.array(extra_info)) + sources_types.append(np.full(sources_cmds[-1].shape, fill_value="psf")) + + psfgen_prep_progress.update(1) + + # I only return num_cores here so it has a reason to be passed to this function, really + # it could just be picked up in the decorator. + return sources_cmds, stack, execute, num_cores, sources_types, sources_paths, sources_extras, disable_progress + + diff --git a/xga/generate/sas/run.py b/xga/generate/sas/run.py new file mode 100644 index 00000000..0f746dfd --- /dev/null +++ b/xga/generate/sas/run.py @@ -0,0 +1,279 @@ +# This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS). +# Last modified by David J Turner (turne540@msu.edu) 09/08/2023, 13:01. Copyright (c) The Contributors + +from functools import wraps +from multiprocessing.dummy import Pool +from subprocess import Popen, PIPE +from typing import Tuple +from warnings import warn + +from tqdm import tqdm + +from .. import SAS_AVAIL, SAS_VERSION +from ..exceptions import SASGenerationError, SASNotFoundError +from ..products import BaseProduct, Image, ExpMap, Spectrum, PSFGrid, AnnularSpectra +from ..samples.base import BaseSample +from ..sources import BaseSource +from ..sources.base import NullSource + + +def execute_cmd(cmd: str, p_type: str, p_path: list, extra_info: dict, src: str) -> Tuple[BaseProduct, str]: + """ + This function is called for the local compute option, and runs the passed command in a Popen shell. + It then creates an appropriate product object, and passes it back to the callback function of the Pool + it was called from. + + :param str cmd: SAS command to be executed on the command line. + :param str p_type: The product type that will be produced by this command. + :param str p_path: The final output path of the product. + :param dict extra_info: Any extra information required to define the product object. + :param str src: A string representation of the source object that this product is associated with. + :return: The product object, and the string representation of the associated source object. + :rtype: Tuple[BaseProduct, str] + """ + out, err = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE).communicate() + out = out.decode("UTF-8", errors='ignore') + err = err.decode("UTF-8", errors='ignore') + + # This part for defining an image object used to make sure that the src wasn't a NullSource, as defining product + # objects is wasteful considering the purpose of a NullSource, but generating exposure maps requires a + # pre-existing image + if p_type == "image": + # Maybe let the user decide not to raise errors detected in stderr + prod = Image(p_path[0], extra_info["obs_id"], extra_info["instrument"], out, err, cmd, extra_info["lo_en"], + extra_info["hi_en"]) + if "psf_corr" in extra_info and extra_info["psf_corr"]: + prod.psf_corrected = True + prod.psf_bins = extra_info["psf_bins"] + prod.psf_model = extra_info["psf_model"] + prod.psf_iterations = extra_info["psf_iter"] + prod.psf_algorithm = extra_info["psf_algo"] + elif p_type == "expmap": + prod = ExpMap(p_path[0], extra_info["obs_id"], extra_info["instrument"], out, err, cmd, + extra_info["lo_en"], extra_info["hi_en"]) + elif p_type == "ccf" and "NullSource" not in src: + # ccf files may not be destined to spend life as product objects, but that doesn't mean + # I can't take momentarily advantage of the error parsing I built into the product classes + prod = BaseProduct(p_path[0], "", "", out, err, cmd) + elif (p_type == "spectrum" or p_type == "annular spectrum set components") and "NullSource" not in src: + prod = Spectrum(p_path[0], extra_info["rmf_path"], extra_info["arf_path"], extra_info["b_spec_path"], + extra_info['central_coord'], extra_info["inner_radius"], extra_info["outer_radius"], + extra_info["obs_id"], extra_info["instrument"], extra_info["grouped"], extra_info["min_counts"], + extra_info["min_sn"], extra_info["over_sample"], out, err, cmd, extra_info["from_region"], + extra_info["b_rmf_path"], extra_info["b_arf_path"]) + elif p_type == "psf" and "NullSource" not in src: + prod = PSFGrid(extra_info["files"], extra_info["chunks_per_side"], extra_info["model"], + extra_info["x_bounds"], extra_info["y_bounds"], extra_info["obs_id"], + extra_info["instrument"], out, err, cmd) + elif p_type == "cross arfs": + prod = BaseProduct(p_path[0], extra_info['obs_id'], extra_info['inst'], out, err, cmd, extra_info) + elif "NullSource" in src: + prod = None + else: + raise NotImplementedError("Not implemented yet") + + # An extra step is required for annular spectrum set components + if p_type == "annular spectrum set components": + prod.annulus_ident = extra_info["ann_ident"] + prod.set_ident = extra_info["set_ident"] + + return prod, src + + +def sas_call(sas_func): + """ + This is used as a decorator for functions that produce SAS command strings. Depending on the + system that XGA is running on (and whether the user requests parallel execution), the method of + executing the SAS command will change. This supports both simple multi-threading and submission + with the Sun Grid Engine. + :return: + """ + # This is a horrible bodge to make Pycharm not remove SAS_AVAIL and SAS_VERSION from import when it cleans + # up prior to committing. + new_sas_avail = SAS_AVAIL + new_sas_version = SAS_VERSION + + @wraps(sas_func) + def wrapper(*args, **kwargs): + # This has to be here to let autodoc do its noble work without falling foul of these errors + if not new_sas_avail and new_sas_version is None: + raise SASNotFoundError("No SAS installation has been found on this machine") + elif not new_sas_avail: + raise SASNotFoundError( + "A SAS installation (v{}) has been found, but the SAS_CCFPATH environment variable is" + " not set.".format(new_sas_version)) + + # The first argument of all of these SAS functions will be the source object (or a list of), + # so rather than return them from the sas function I'll just access them like this. + if isinstance(args[0], (BaseSource, NullSource)): + sources = [args[0]] + elif isinstance(args[0], (BaseSample, list)): + sources = args[0] + else: + raise TypeError("Please pass a source, NullSource, or sample object.") + + # This is the output from whatever function this is a decorator for + cmd_list, to_stack, to_execute, cores, p_type, paths, extra_info, disable = sas_func(*args, **kwargs) + + src_lookup = {} + all_run = [] # Combined command list for all sources + all_type = [] # Combined expected type list for all sources + all_path = [] # Combined expected path list for all sources + all_extras = [] # Combined extra information list for all sources + source_rep = [] # For repr calls of each source object, needed for assigning products to sources + for ind in range(len(cmd_list)): + source = sources[ind] + if len(cmd_list[ind]) > 0: + src_lookup[repr(source)] = ind + # If there are commands to add to a source queue, then do it + source.update_queue(cmd_list[ind], p_type[ind], paths[ind], extra_info[ind], to_stack) + + # If we do want to execute the commands this time round, we read them out for all sources + # and add them to these master lists + if to_execute: + to_run, expected_type, expected_path, extras = source.get_queue() + all_run += to_run + all_type += expected_type + all_path += expected_path + all_extras += extras + source_rep += [repr(source)] * len(to_run) + + # This is what the returned products get stored in before they're assigned to sources + results = {s: [] for s in src_lookup} + # Any errors raised shouldn't be SAS, as they are stored within the product object. + raised_errors = [] + # Making sure something is defined for this variable + prod_type_str = "" + if to_execute and len(all_run) > 0: + # Will run the commands locally in a pool + prod_type_str = ", ".join(set(all_type)) + with tqdm(total=len(all_run), desc="Generating products of type(s) " + prod_type_str, + disable=disable) as gen, Pool(cores) as pool: + def callback(results_in: Tuple[BaseProduct, str]): + """ + Callback function for the apply_async pool method, gets called when a task finishes + and something is returned. + :param Tuple[BaseProduct, str] results_in: Results of the command call. + """ + nonlocal gen # The progress bar will need updating + nonlocal results # The dictionary the command call results are added to + if results_in[0] is None: + gen.update(1) + return + else: + prod_obj, rel_src = results_in + results[rel_src].append(prod_obj) + gen.update(1) + + def err_callback(err): + """ + The callback function for errors that occur inside a task running in the pool. + :param err: An error that occurred inside a task. + """ + nonlocal raised_errors + nonlocal gen + + if err is not None: + # Rather than throwing an error straight away I append them all to a list for later. + raised_errors.append(err) + gen.update(1) + + for cmd_ind, cmd in enumerate(all_run): + # These are just the relevant entries in all these lists for the current command + # Just defined like this to save on line length for apply_async call. + exp_type = all_type[cmd_ind] + exp_path = all_path[cmd_ind] + ext = all_extras[cmd_ind] + src = source_rep[cmd_ind] + pool.apply_async(execute_cmd, args=(str(cmd), str(exp_type), exp_path, ext, src), + error_callback=err_callback, callback=callback) + pool.close() # No more tasks can be added to the pool + pool.join() # Joins the pool, the code will only move on once the pool is empty. + + elif to_execute and len(all_run) == 0: + # It is possible to call a wrapped SAS function and find that the products already exist. + # print("All requested products already exist") + pass + + # Now we assign products to source objects + all_to_raise = [] + # This is for the special case of generating an AnnularSpectra product + ann_spec_comps = {k: [] for k in results} + for entry in results: + # Made this lookup list earlier, using string representations of source objects. + # Finds the ind of the list of sources that we should add this set of products to + ind = src_lookup[entry] + to_raise = [] + for product in results[entry]: + product: BaseProduct + ext_info = "- {s} is the associated source, the specific data used is " \ + "{o}-{i}.".format(s=sources[ind].name, o=product.obs_id, i=product.instrument) + if len(product.sas_errors) == 1: + to_raise.append(SASGenerationError(product.sas_errors[0] + ext_info)) + elif len(product.sas_errors) > 1: + errs = [SASGenerationError(e + ext_info) for e in product.sas_errors] + to_raise += errs + + if len(product.errors) == 1: + to_raise.append(SASGenerationError(product.errors[0] + "-" + ext_info)) + elif len(product.errors) > 1: + errs = [SASGenerationError(e + "-" + ext_info) for e in product.errors] + to_raise += errs + + # ccfs aren't actually stored in the source product storage, but they are briefly put into + # BaseProducts for error parsing etc. So if the product type is None we don't store it + if product.type is not None and product.usable and prod_type_str != "annular spectrum set components": + # For each product produced for this source, we add it to the storage hierarchy + sources[ind].update_products(product) + elif product.type is not None and product.usable and prod_type_str == "annular spectrum set components": + # Really we're just re-creating the results dictionary here, but I want these products + # to go through the error checking stuff like everything else does + ann_spec_comps[entry].append(product) + # In case they are components of an annular spectrum but they are either none or not usable + elif prod_type_str == "annular spectrum set components": + warn("An annular spectrum component ({a}) for {o}{i} has not been generated properly, contact " + "the development team if a SAS error is not " + "shown.".format(a=product.storage_key, o=product.obs_id, i=product.instrument), stacklevel=2) + # Here the generated product was a cross-arf, and needs to be added to the right annular spectrum + # object that already exists in our source + elif prod_type_str == "cross arfs": + # OH NO WE'RE USING A PROTECTED ATTRIBUTE - but don't worry, I didn't give this a property + # deliberately to hopefully discourage any user from doing anything with it + ei = product._extra_info + ann_spec = sources[ind].get_annular_spectra(set_id=ei['ann_spec_set_id']) + ann_spec.add_cross_arf(product, ei['obs_id'], ei['inst'], ei['src_ann_id'], ei['cross_ann_id'], + ei['ann_spec_set_id']) + + if len(to_raise) != 0: + all_to_raise.append(to_raise) + + if prod_type_str == "annular spectrum set components": + for entry in ann_spec_comps: + # So now we pass the list of spectra to a AnnularSpectra definition - and it will sort them out + # itself so the order doesn't matter + ann_spec = AnnularSpectra(ann_spec_comps[entry]) + # Refresh the value of ind so that the correct source is used for radii conversion and so that + # the AnnularSpectra is added to the correct source. + ind = src_lookup[entry] + if sources[ind].redshift is not None: + # If we know the redshift we will add the radii to the annular spectra in proper distance units + ann_spec.proper_radii = sources[ind].convert_radius(ann_spec.radii, 'kpc') + # And adding our exciting new set of annular spectra into the storage structure + sources[ind].update_products(ann_spec) + + # Errors raised here should not be to do with SAS generation problems, but other purely pythonic errors + for error in raised_errors: + raise error + + # And here are all the errors during SAS generation, if any + if len(all_to_raise) != 0: + raise SASGenerationError(all_to_raise) + + # If only one source was passed, turn it back into a source object rather than a source + # object in a list. + if len(sources) == 1: + sources = sources[0] + return sources + return wrapper + + diff --git a/xga/generate/sas/spec.py b/xga/generate/sas/spec.py new file mode 100644 index 00000000..5a9c513f --- /dev/null +++ b/xga/generate/sas/spec.py @@ -0,0 +1,849 @@ +# This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS). +# Last modified by David J Turner (turne540@msu.edu) 11/05/2023, 15:46. Copyright (c) The Contributors + +import os +from copy import copy +from itertools import permutations +from random import randint +from typing import Union, List + +import numpy as np +from astropy.units import Quantity + +from ._common import region_setup, _gen_detmap_cmd +from .misc import cifbuild +from .. import OUTPUT, NUM_CORES +from ..exceptions import SASInputInvalid, NotAssociatedError, NoProductAvailableError +from ..samples.base import BaseSample +from ..sas.run import sas_call +from ..sources import BaseSource, ExtendedSource, GalaxyCluster + + +def _spec_cmds(sources: Union[BaseSource, BaseSample], outer_radius: Union[str, Quantity], + inner_radius: Union[str, Quantity] = Quantity(0, 'arcsec'), group_spec: bool = True, + min_counts: int = 5, min_sn: float = None, over_sample: int = None, one_rmf: bool = True, + num_cores: int = NUM_CORES, disable_progress: bool = False, force_gen: bool = False): + """ + An internal function to generate all the commands necessary to produce an evselect spectrum, but is not + decorated by the sas_call function, so the commands aren't immediately run. This means it can be used for + evselect functions that generate custom sets of spectra (like a set of annular spectra for instance), as well + as for things like the standard evselect_spectrum function which produce relatively boring spectra. + + :param BaseSource/BaseSample sources: A single source object, or a sample of sources. + :param str/Quantity outer_radius: The name or value of the outer radius to use for the generation of + the spectrum (for instance 'r200' would be acceptable for a GalaxyCluster, or Quantity(1000, 'kpc')). If + 'region' is chosen (to use the regions in region files), then any inner radius will be ignored. + :param str/Quantity inner_radius: The name or value of the inner radius to use for the generation of + the spectrum (for instance 'r500' would be acceptable for a GalaxyCluster, or Quantity(300, 'kpc')). By + default this is zero arcseconds, resulting in a circular spectrum. + :param bool group_spec: A boolean flag that sets whether generated spectra are grouped or not. + :param float min_counts: If generating a grouped spectrum, this is the minimum number of counts per channel. + To disable minimum counts set this parameter to None. + :param float min_sn: If generating a grouped spectrum, this is the minimum signal to noise in each channel. + To disable minimum signal to noise set this parameter to None. + :param int over_sample: The minimum energy resolution for each group, set to None to disable. e.g. if + over_sample=3 then the minimum width of a group is 1/3 of the resolution FWHM at that energy. + :param bool one_rmf: This flag tells the method whether it should only generate one RMF for a particular + ObsID-instrument combination - this is much faster in some circumstances, however the RMF does depend + slightly on position on the detector. + :param int num_cores: The number of cores to use, default is set to 90% of available. + :param bool disable_progress: Setting this to true will turn off the SAS generation progress bar. + :param bool force_gen: This boolean flag will force the regeneration of spectra, even if they already exist. + """ + # This function supports passing both individual sources and sets of sources + if isinstance(sources, BaseSource): + sources = [sources] + + if outer_radius != 'region': + from_region = False + sources, inner_radii, outer_radii = region_setup(sources, outer_radius, inner_radius, disable_progress, '') + else: + # This is used in the extra information dictionary for when the XGA spectrum object is defined + from_region = True + + # Just make sure these values are the expect data type, this matters when the information is + # added to the storage strings and file names + if over_sample is not None: + over_sample = int(over_sample) + if min_counts is not None: + min_counts = int(min_counts) + if min_sn is not None: + min_sn = float(min_sn) + + # These check that the user hasn't done something silly like passing multiple grouping options, this is not + # allowed by SAS, will cause the generation to fail + if all([o is not None for o in [min_counts, min_sn]]): + raise SASInputInvalid("evselect only allows one grouping option to be passed, you can't group both by" + " minimum counts AND by minimum signal to noise.") + # Should also check that the user has passed any sort of grouping argument, if they say they want to group + elif group_spec and all([o is None for o in [min_counts, min_sn]]): + raise SASInputInvalid("If you set group_spec=True, you must supply a grouping option, either min_counts" + " or min_sn.") + + # Sets up the extra part of the storage key name depending on if grouping is enabled + if group_spec and min_counts is not None: + extra_name = "_mincnt{}".format(min_counts) + elif group_spec and min_sn is not None: + extra_name = "_minsn{}".format(min_sn) + else: + extra_name = '' + + # Have to make sure that all observations have an up to date cif file. + cifbuild(sources, disable_progress=disable_progress, num_cores=num_cores) + + # And if it was oversampled during generation then we need to include that as well + if over_sample is not None: + extra_name += "_ovsamp{ov}".format(ov=over_sample) + + # Define the various SAS commands that need to be populated, for a useful spectrum you also need ARF/RMF + spec_cmd = "cd {d}; cp ../ccf.cif .; export SAS_CCF={ccf}; evselect table={e} withspectrumset=yes " \ + "spectrumset={s} energycolumn=PI spectralbinsize=5 withspecranges=yes specchannelmin=0 " \ + "specchannelmax={u} {ex}" + + # The detmap in this context is just an image of the source distribution of observation, in detector coordinates, + # and is used to weight the generation of the ARF curves. + detmap_cmd = "evselect table={e} imageset={d} xcolumn=DETX ycolumn=DETY imagebinning=binSize ximagebinsize=100 " \ + "yimagebinsize=100 {ex}" + + # This command just makes a standard XCS image, but will be used to generate images to debug the drilling + # out of regions, as the spectrum expression will be supplied so we can see exactly what data has been removed. + debug_im = "evselect table={e} imageset={i} xcolumn=X ycolumn=Y ximagebinsize=87 " \ + "yimagebinsize=87 squarepixels=yes ximagesize=512 yimagesize=512 imagebinning=binSize " \ + "ximagemin=3649 ximagemax=48106 withxranges=yes yimagemin=3649 yimagemax=48106 withyranges=yes {ex}" + + rmf_cmd = "rmfgen rmfset={r} spectrumset='{s}' detmaptype={dt} detmaparray={ds} extendedsource={es}" + + # Don't need to run backscale separately, as this arfgen call will do it automatically + arf_cmd = "arfgen spectrumset={s} arfset={a} withrmfset=yes rmfset={r} badpixlocation={e} " \ + "extendedsource={es} detmaptype={dt} detmaparray={ds} setbackscale=no badpixmaptype={dt}" + + bscal_cmd = "backscale spectrumset={s} badpixlocation={e}" + + # If the user wants to group spectra, then we'll need this template command: + grp_cmd = "specgroup spectrumset={s} overwrite=yes backgndset={b} arfset={a} rmfset={r} addfilenames=no" + + stack = False # This tells the sas_call routine that this command won't be part of a stack + execute = True # This should be executed immediately + + sources_cmds = [] + sources_paths = [] + sources_extras = [] + sources_types = [] + for s_ind, source in enumerate(sources): + # rmfgen and arfgen both take arguments that describe if something is an extended source or not, + # so we check the source type + if isinstance(source, (ExtendedSource, GalaxyCluster)): + ex_src = "yes" + # Sets the detmap type, using an image of the source is appropriate for extended sources like clusters, + # but not for point sources + dt = 'dataset' + else: + ex_src = "no" + dt = 'flat' + cmds = [] + final_paths = [] + extra_info = [] + + if outer_radius != 'region': + # Finding interloper regions within the radii we have specified has been put here because it all works in + # degrees and as such only needs to be run once for all the different observations. + interloper_regions = source.regions_within_radii(inner_radii[s_ind], outer_radii[s_ind], + source.default_coord) + # This finds any regions which + back_inter_reg = source.regions_within_radii(outer_radii[s_ind] * source.background_radius_factors[0], + outer_radii[s_ind] * source.background_radius_factors[1], + source.default_coord) + src_inn_rad_str = inner_radii[s_ind].value + src_out_rad_str = outer_radii[s_ind].value + # The key under which these spectra will be stored + spec_storage_name = "ra{ra}_dec{dec}_ri{ri}_ro{ro}_grp{gr}" + spec_storage_name = spec_storage_name.format(ra=source.default_coord[0].value, + dec=source.default_coord[1].value, + ri=src_inn_rad_str, ro=src_out_rad_str, gr=group_spec) + else: + spec_storage_name = "region_grp{gr}".format(gr=group_spec) + + # Adds on the extra information about grouping to the storage key + spec_storage_name += extra_name + + # Check which event lists are associated with each individual source + for pack in source.get_products("events", just_obj=False): + obs_id = pack[0] + inst = pack[1] + + if not os.path.exists(OUTPUT + obs_id): + os.mkdir(OUTPUT + obs_id) + + # Got to check if this spectrum already exists + exists = source.get_products("spectrum", obs_id, inst, extra_key=spec_storage_name) + if len(exists) == 1 and exists[0].usable and not force_gen: + continue + + # If there is no match to a region, the source region returned by this method will be None, + # and if the user wants to generate spectra from region files, we have to ignore that observations + if outer_radius == "region" and source.source_back_regions("region", obs_id)[0] is None: + continue + + # Because the region will be different for each ObsID, I have to call the setup function here + if outer_radius == 'region': + interim_source, inner_radii, outer_radii = region_setup([source], outer_radius, inner_radius, + disable_progress, obs_id) + # Need the reg for central coordinates + reg = source.source_back_regions('region', obs_id)[0] + reg_cen_coords = Quantity([reg.center.ra.value, reg.center.dec.value], 'deg') + # Pass the largest outer radius here, so we'll look for interlopers in a circle with the radius + # being the largest axis of the ellipse + interloper_regions = source.regions_within_radii(inner_radii[0][0], max(outer_radii[0]), reg_cen_coords) + back_inter_reg = source.regions_within_radii(max(outer_radii[0]) * source.background_radius_factors[0], + max(outer_radii[0]) * source.background_radius_factors[1], + reg_cen_coords) + + reg = source.get_annular_sas_region(inner_radii[0], outer_radii[0], obs_id, inst, + interloper_regions=interloper_regions, central_coord=reg_cen_coords, + rot_angle=reg.angle) + b_reg = source.get_annular_sas_region(outer_radii[0] * source.background_radius_factors[0], + outer_radii[0] * source.background_radius_factors[1], obs_id, + inst, interloper_regions=back_inter_reg, + central_coord=source.default_coord) + # Explicitly read out the current inner radius and outer radius, useful for some bits later + src_inn_rad_str = 'and'.join(inner_radii[0].value.astype(str)) + src_out_rad_str = 'and'.join(outer_radii[0].value.astype(str)) + "_region" + # Also explicitly read out into variables the actual radii values + inn_rad_degrees = inner_radii[0] + out_rad_degrees = outer_radii[0] + + else: + # This constructs the sas strings for any radius that isn't 'region' + reg = source.get_annular_sas_region(inner_radii[s_ind], outer_radii[s_ind], obs_id, inst, + interloper_regions=interloper_regions, + central_coord=source.default_coord) + b_reg = source.get_annular_sas_region(outer_radii[s_ind] * source.background_radius_factors[0], + outer_radii[s_ind] * source.background_radius_factors[1], obs_id, + inst, interloper_regions=back_inter_reg, + central_coord=source.default_coord) + inn_rad_degrees = inner_radii[s_ind] + out_rad_degrees = outer_radii[s_ind] + + # Some settings depend on the instrument, XCS uses different patterns for different instruments + if "pn" in inst: + # Also the upper channel limit is different for EPN and EMOS detectors + spec_lim = 20479 + expr = "expression='#XMMEA_EP && (PATTERN <= 4) && (FLAG .eq. 0) && {s}'".format(s=reg) + b_expr = "expression='#XMMEA_EP && (PATTERN <= 4) && (FLAG .eq. 0) && {s}'".format(s=b_reg) + # This is an expression without region information to be used for making the detmaps + # required for ARF generation, we start off assuming we'll use a MOS observation as the detmap + d_expr = "expression='#XMMEA_EM && (PATTERN <= 12) && (FLAG .eq. 0)'" + + # The detmap for the arfgen call should ideally not be from the same instrument as the observation, + # so for PN we preferentially select MOS2 (as MOS1 was damaged). However if there isn't a MOS2 + # events list from the same observation then we select MOS1, and failing that we use PN. + try: + detmap_evts = source.get_products("events", obs_id=obs_id, inst='mos2')[0] + except NotAssociatedError: + try: + detmap_evts = source.get_products("events", obs_id=obs_id, inst='mos1')[0] + except NotAssociatedError: + detmap_evts = source.get_products("events", obs_id=obs_id, inst='pn')[0] + # If all is lost and there are no MOS event lists then we must revert to the PN expression + d_expr = "expression='#XMMEA_EP && (PATTERN <= 4) && (FLAG .eq. 0)'" + + elif "mos" in inst: + spec_lim = 11999 + expr = "expression='#XMMEA_EM && (PATTERN <= 12) && (FLAG .eq. 0) && {s}'".format(s=reg) + b_expr = "expression='#XMMEA_EM && (PATTERN <= 12) && (FLAG .eq. 0) && {s}'".format(s=b_reg) + # This is an expression without region information to be used for making the detmaps + # required for ARF generation, we start off assuming we'll use the PN observation as the detmap + d_expr = "expression='#XMMEA_EP && (PATTERN <= 4) && (FLAG .eq. 0)'" + + # The detmap for the arfgen call should ideally not be from the same instrument as the observation, + # so for MOS observations we preferentially select PN. However if there isn't a PN events list + # from the same observation then for MOS2 we select MOS1, and for MOS1 we select MOS2 (as they + # are rotated wrt one another it is still semi-valid), and failing that MOS2 will get MOS2 and MOS1 + # will get MOS1. + if inst[-1] == 1: + cur = '1' + opp = '2' + else: + cur = '2' + opp = '1' + try: + detmap_evts = source.get_products("events", obs_id=obs_id, inst='pn')[0] + except NotAssociatedError: + # If we must use a MOS detmap then we have to use the MOS expression + d_expr = "expression='#XMMEA_EM && (PATTERN <= 12) && (FLAG .eq. 0)'" + try: + detmap_evts = source.get_products("events", obs_id=obs_id, inst='mos'+opp)[0] + except NotAssociatedError: + detmap_evts = source.get_products("events", obs_id=obs_id, inst='mos'+cur)[0] + else: + raise ValueError("You somehow have an illegal value for the instrument name...") + + # Some of the SAS tasks have issues with filenames with a '+' in them for some reason, so this + # replaces any + symbols that may be in the source name with another character + source_name = source.name.replace("+", "x") + + # Just grabs the event list object + evt_list = pack[-1] + # Sets up the file names of the output files, adding a random number so that the + # function for generating annular spectra doesn't clash and try to use the same folder + dest_dir = OUTPUT + "{o}/{i}_{n}_temp_{r}/".format(o=obs_id, i=inst, n=source_name, r=randint(0, 1e+8)) + + # Sets up something very similar to the extra name variable above, but for the file names + # Stores some information about grouping in the file names + if group_spec and min_counts is not None: + extra_file_name = "_mincnt{c}".format(c=min_counts) + elif group_spec and min_sn is not None: + extra_file_name = "_minsn{s}".format(s=min_sn) + else: + extra_file_name = '' + + # And if it was oversampled during generation then we need to include that as well + if over_sample is not None: + extra_file_name += "_ovsamp{ov}".format(ov=over_sample) + + spec = "{o}_{i}_{n}_ra{ra}_dec{dec}_ri{ri}_ro{ro}_grp{gr}{ex}_spec.fits" + spec = spec.format(o=obs_id, i=inst, n=source_name, ra=source.default_coord[0].value, + dec=source.default_coord[1].value, ri=src_inn_rad_str, ro=src_out_rad_str, gr=group_spec, + ex=extra_file_name) + b_spec = "{o}_{i}_{n}_ra{ra}_dec{dec}_ri{ri}_ro{ro}_grp{gr}{ex}_backspec.fits" + b_spec = b_spec.format(o=obs_id, i=inst, n=source_name, ra=source.default_coord[0].value, + dec=source.default_coord[1].value, ri=src_inn_rad_str, ro=src_out_rad_str, + gr=group_spec, ex=extra_file_name) + # Arguably checking whether this is an extended source and adjusting this is irrelevant as the file + # name won't be used anyway with detmaptype set to flat, but I'm doing it anyway + if ex_src: + det_map = "{o}_{i}_detmap.fits" + det_map = det_map.format(o=detmap_evts.obs_id, i=detmap_evts.instrument) + else: + det_map = "" + arf = "{o}_{i}_{n}_ra{ra}_dec{dec}_ri{ri}_ro{ro}_grp{gr}{ex}.arf" + arf = arf.format(o=obs_id, i=inst, n=source_name, ra=source.default_coord[0].value, + dec=source.default_coord[1].value, ri=src_inn_rad_str, ro=src_out_rad_str, gr=group_spec, + ex=extra_file_name) + # b_arf = "{o}_{i}_{n}_ra{ra}_dec{dec}_ri{ri}_ro{ro}_grp{gr}{ex}_back.arf" + # b_arf = b_arf.format(o=obs_id, i=inst, n=source_name, ra=source.default_coord[0].value, + # dec=source.default_coord[1].value, ri=src_inn_rad_str, ro=src_out_rad_str, + # gr=group_spec, ex=extra_file_name) + ccf = dest_dir + "ccf.cif" + + # These file names are for the debug images of the source and background images, they will not be loaded + # in as a XGA products, but exist purely to check by eye if necessary + dim = "{o}_{i}_{n}_ra{ra}_dec{dec}_ri{ri}_ro{ro}_grp{gr}{ex}_debug." \ + "fits".format(o=obs_id, i=inst, n=source_name, ra=source.default_coord[0].value, + dec=source.default_coord[1].value, ri=src_inn_rad_str, ro=src_out_rad_str, + gr=group_spec, ex=extra_file_name) + b_dim = "{o}_{i}_{n}_ra{ra}_dec{dec}_ri{ri}_ro{ro}_grp{gr}{ex}_back_debug." \ + "fits".format(o=obs_id, i=inst, n=source_name, ra=source.default_coord[0].value, + dec=source.default_coord[1].value, ri=src_inn_rad_str, ro=src_out_rad_str, + gr=group_spec, ex=extra_file_name) + + # Fills out the evselect command to make the main and background spectra + s_cmd_str = spec_cmd.format(d=dest_dir, ccf=ccf, e=evt_list.path, s=spec, u=spec_lim, ex=expr) + sb_cmd_str = spec_cmd.format(d=dest_dir, ccf=ccf, e=evt_list.path, s=b_spec, u=spec_lim, ex=b_expr) + + # Does the same thing for the evselect command to make the detmap + d_cmd_str = detmap_cmd.format(e=detmap_evts.path, d=det_map, ex=d_expr) + + # Populates the debug image commands + dim_cmd_str = debug_im.format(e=evt_list.path, ex=expr, i=dim) + b_dim_cmd_str = debug_im.format(e=evt_list.path, ex=b_expr, i=b_dim) + + # This chunk adds rmfgen commands depending on whether we're using a universal RMF or + # an individual one for each spectrum. Also adds arfgen commands on the end, as they depend on + # the rmf. + if one_rmf: + rmf = "{o}_{i}_{n}_universal.rmf".format(o=obs_id, i=inst, n=source_name) + # b_rmf = rmf + else: + rmf = "{o}_{i}_{n}_ra{ra}_dec{dec}_ri{ri}_ro{ro}_grp{gr}{ex}.rmf" + rmf = rmf.format(o=obs_id, i=inst, n=source_name, ra=source.default_coord[0].value, + dec=source.default_coord[1].value, ri=src_inn_rad_str, ro=src_out_rad_str, + gr=group_spec, ex=extra_file_name) + # b_rmf = "{o}_{i}_{n}_ra{ra}_dec{dec}_ri{ri}_ro{ro}_grp{gr}{ex}_back.rmf" + # b_rmf = b_rmf.format(o=obs_id, i=inst, n=source_name, ra=source.default_coord[0].value, + # dec=source.default_coord[1].value, ri=src_inn_rad_str, ro=src_out_rad_str, + # gr=group_spec, ex=extra_file_name) + + final_rmf_path = OUTPUT + obs_id + '/' + rmf + if one_rmf and (not os.path.exists(final_rmf_path) or force_gen): + cmd_str = ";".join([s_cmd_str, dim_cmd_str, b_dim_cmd_str, d_cmd_str, + rmf_cmd.format(r=rmf, s=spec, es=ex_src, ds=det_map, dt=dt), + arf_cmd.format(s=spec, a=arf, r=rmf, e=evt_list.path, es=ex_src, ds=det_map, dt=dt), + sb_cmd_str, bscal_cmd.format(s=spec, e=evt_list.path), + bscal_cmd.format(s=b_spec, e=evt_list.path)]) + #arf_cmd.format(s=b_spec, a=b_arf, r=b_rmf, e=evt_list.path, es=ex_src, ds=det_map) + elif not one_rmf and (not os.path.exists(final_rmf_path) or force_gen): + cmd_str = ";".join([s_cmd_str, dim_cmd_str, b_dim_cmd_str, d_cmd_str, + rmf_cmd.format(r=rmf, s=spec, es=ex_src, ds=det_map, dt=dt), + arf_cmd.format(s=spec, a=arf, r=rmf, e=evt_list.path, es=ex_src, + ds=det_map, dt=dt)]) + ";" + cmd_str += ";".join([sb_cmd_str, bscal_cmd.format(s=spec, e=evt_list.path), + bscal_cmd.format(s=b_spec, e=evt_list.path)]) + #, rmf_cmd.format(r=b_rmf, s=b_spec, es=ex_src, ds=det_map) + # arf_cmd.format(s=b_spec, a=b_arf, r=b_rmf, e=evt_list.path, es=ex_src, ds=det_map) + else: + # This one just copies the existing universal rmf into the temporary generation folder + cmd_str = "cp {f_rmf} {d};".format(f_rmf=final_rmf_path, d=dest_dir) + cmd_str += ";".join([s_cmd_str, dim_cmd_str, b_dim_cmd_str, d_cmd_str, + arf_cmd.format(s=spec, a=arf, r=rmf, e=evt_list.path, es=ex_src, + ds=det_map, dt=dt)]) + ";" + cmd_str += ";".join([sb_cmd_str, bscal_cmd.format(s=spec, e=evt_list.path), + bscal_cmd.format(s=b_spec, e=evt_list.path)]) + #arf_cmd.format(s=b_spec, a=b_arf, r=b_rmf, e=evt_list.path, es=ex_src, ds=det_map) + + # If the user wants to produce grouped spectra, then this if statement is triggered and adds a specgroup + # command at the end. The groupspec command will replace the ungrouped spectrum. + if group_spec: + new_grp = grp_cmd.format(s=spec, b=b_spec, r=rmf, a=arf) + if min_counts is not None: + new_grp += " mincounts={mc}".format(mc=min_counts) + if min_sn is not None: + new_grp += " minSN={msn}".format(msn=min_sn) + if over_sample is not None: + new_grp += " oversample={os}".format(os=over_sample) + cmd_str += "; " + new_grp + + # Adds clean up commands to move all generated files and remove temporary directory + cmd_str += "; mv * ../; cd ..; rm -r {d}".format(d=dest_dir) + cmds.append(cmd_str) # Adds the full command to the set + # Makes sure the whole path to the temporary directory is created + os.makedirs(dest_dir) + + final_paths.append(os.path.join(OUTPUT, obs_id, spec)) + extra_info.append({"inner_radius": inn_rad_degrees, "outer_radius": out_rad_degrees, + "rmf_path": os.path.join(OUTPUT, obs_id, rmf), + "arf_path": os.path.join(OUTPUT, obs_id, arf), + "b_spec_path": os.path.join(OUTPUT, obs_id, b_spec), + "b_rmf_path": '', + "b_arf_path": '', + "obs_id": obs_id, "instrument": inst, "grouped": group_spec, "min_counts": min_counts, + "min_sn": min_sn, "over_sample": over_sample, "central_coord": source.default_coord, + "from_region": from_region}) + + sources_cmds.append(np.array(cmds)) + sources_paths.append(np.array(final_paths)) + # This contains any other information that will be needed to instantiate the class + # once the SAS cmd has run + sources_extras.append(np.array(extra_info)) + sources_types.append(np.full(sources_cmds[-1].shape, fill_value="spectrum")) + + return sources_cmds, stack, execute, num_cores, sources_types, sources_paths, sources_extras, disable_progress + + +@sas_call +def evselect_spectrum(sources: Union[BaseSource, BaseSample], outer_radius: Union[str, Quantity], + inner_radius: Union[str, Quantity] = Quantity(0, 'arcsec'), group_spec: bool = True, + min_counts: int = 5, min_sn: float = None, over_sample: float = None, one_rmf: bool = True, + num_cores: int = NUM_CORES, disable_progress: bool = False): + """ + A wrapper for all of the SAS processes necessary to generate an XMM spectrum that can be analysed + in XSPEC. Every observation associated with this source, and every instrument associated with that + observation, will have a spectrum generated using the specified outer and inner radii as a boundary. The + default inner radius is zero, so by default this function will produce circular spectra out to the outer_radius. + It is possible to generate both grouped and ungrouped spectra using this function, with the degree + of grouping set by the min_counts, min_sn, and oversample parameters. + + :param BaseSource/BaseSample sources: A single source object, or a sample of sources. + :param str/Quantity outer_radius: The name or value of the outer radius to use for the generation of + the spectrum (for instance 'r200' would be acceptable for a GalaxyCluster, or Quantity(1000, 'kpc')). If + 'region' is chosen (to use the regions in region files), then any inner radius will be ignored. If you are + generating for multiple sources then you can also pass a Quantity with one entry per source. + :param str/Quantity inner_radius: The name or value of the inner radius to use for the generation of + the spectrum (for instance 'r500' would be acceptable for a GalaxyCluster, or Quantity(300, 'kpc')). By + default this is zero arcseconds, resulting in a circular spectrum. If you are + generating for multiple sources then you can also pass a Quantity with one entry per source. + :param bool group_spec: A boolean flag that sets whether generated spectra are grouped or not. + :param float min_counts: If generating a grouped spectrum, this is the minimum number of counts per channel. + To disable minimum counts set this parameter to None. + :param float min_sn: If generating a grouped spectrum, this is the minimum signal to noise in each channel. + To disable minimum signal to noise set this parameter to None. + :param float over_sample: The minimum energy resolution for each group, set to None to disable. e.g. if + over_sample=3 then the minimum width of a group is 1/3 of the resolution FWHM at that energy. + :param bool one_rmf: This flag tells the method whether it should only generate one RMF for a particular + ObsID-instrument combination - this is much faster in some circumstances, however the RMF does depend + slightly on position on the detector. + :param int num_cores: The number of cores to use, default is set to 90% of available. + :param bool disable_progress: Setting this to true will turn off the SAS generation progress bar. + """ + # All the workings of this function are in _spec_cmds so that the annular spectrum set generation function + # can also use them + return _spec_cmds(sources, outer_radius, inner_radius, group_spec, min_counts, min_sn, over_sample, one_rmf, + num_cores, disable_progress) + + +@sas_call +def spectrum_set(sources: Union[BaseSource, BaseSample], radii: Union[List[Quantity], Quantity], + group_spec: bool = True, min_counts: int = 5, min_sn: float = None, over_sample: float = None, + one_rmf: bool = True, num_cores: int = NUM_CORES, force_regen: bool = False, + disable_progress: bool = False): + """ + This function can be used to produce 'sets' of XGA Spectrum objects, generated in concentric circular annuli. + Such spectrum sets can be used to measure projected spectroscopic quantities, or even be de-projected to attempt + to measure spectroscopic quantities in a three dimensional space. + + :param BaseSource/BaseSample sources: A single source object, or a sample of sources. + :param List[Quantity]/Quantity radii: A list of non-scalar quantities containing the boundary radii of the + annuli for the sources. A single quantity containing at least three radii may be passed if one source + is being analysed, but for multiple sources there should be a quantity (with at least three radii), PER + source. + :param bool group_spec: A boolean flag that sets whether generated spectra are grouped or not. + :param float min_counts: If generating a grouped spectrum, this is the minimum number of counts per channel. + To disable minimum counts set this parameter to None. + :param float min_sn: If generating a grouped spectrum, this is the minimum signal to noise in each channel. + To disable minimum signal to noise set this parameter to None. + :param float over_sample: The minimum energy resolution for each group, set to None to disable. e.g. if + over_sample=3 then the minimum width of a group is 1/3 of the resolution FWHM at that energy. + :param bool one_rmf: This flag tells the method whether it should only generate one RMF for a particular + ObsID-instrument combination - this is much faster in some circumstances, however the RMF does depend + slightly on position on the detector. + :param int num_cores: The number of cores to use, default is set to 90% of available. + :param bool force_regen: This will force all the constituent spectra of the set to be regenerated, use this + if your call to this function was interrupted and an incomplete AnnularSpectrum is being read in. + :param bool disable_progress: Setting this to true will turn off the SAS generation progress bar. + """ + # If it's a single source I put it into an iterable object (i.e. a list), just for convenience + if isinstance(sources, BaseSource): + sources = [sources] + elif isinstance(sources, list) and not all([isinstance(s, BaseSource) for s in sources]): + raise TypeError("If a list is passed, each element must be a source.") + # And the only other option is a BaseSample instance, so if it isn't that then we get angry + elif not isinstance(sources, (BaseSample, list)): + raise TypeError("Please only pass source or sample objects for the 'sources' parameter of this function") + + # I just want to make sure that nobody passes anything daft for the radii + if isinstance(radii, Quantity) and len(sources) != 1: + raise TypeError("You may only pass a Quantity for the radii parameter if you are only analysing " + "one source. You are attempting to generate spectrum sets for {0} sources, so please pass " + "a list of {0} non-scalar quantities.".format(len(sources))) + elif isinstance(radii, Quantity): + pass + elif isinstance(radii, (list, np.ndarray)) and len(sources) != len(radii): + raise ValueError("The list of quantities passed for the radii parameter must be the same length as the " + "number of sources which you are analysing.") + + # If we've made it to this point then the radii type is fine, but I want to make sure that radii is a list + # of quantities - as expected by the rest of the function + if isinstance(radii, Quantity): + radii = [radii] + + # Check that all radii are passed in the units, I could convert them and make sure but I can't + # be bothered + if len(set([r.unit for r in radii])) != 1: + raise ValueError("Please pass all radii sets in the same units.") + + # I'm also going to check to make sure that every annulus N+1 is further out then annulus N. There is a check + # for this in the spec setup function but if I catch it here I can give a more informative error message + for s_ind, source in enumerate(sources): + # I'll also check that the quantity passed for the radii isn't scalar, and isn't only two long - that's not + # a set of annuli, they should just use evselect_spectrum for that + cur_rad = radii[s_ind] + src_name = source.name + if cur_rad.isscalar: + raise ValueError("The radii quantity you have passed for {s} only has one value in it, this function is " + "for generating a set of multiple annular spectra, I need at least three " + "entries.".format(s=src_name)) + elif len(cur_rad) < 3: + raise ValueError("The radii quantity have you passed for {s} must have at least 3 entries, this " + "would generate a set of 2 annular spectra and is the minimum for this " + "function.".format(s=src_name)) + + # This runs through the radii for this source and makes sure that annulus N+1 is larger than annulus N + greater_check = [cur_rad[r_ind] < cur_rad[r_ind+1] for r_ind in range(0, len(cur_rad)-1)] + if not all(greater_check): + raise ValueError("Not all of the radii passed for {s} are larger than the annulus that " + "precedes them.".format(s=src_name)) + + # Just to make sure calibration files have been generated, though I don't actually think they could + # have gotten to this point without them + cifbuild(sources, num_cores, disable_progress) + + # This generates a spectra between the innermost and outmost radii for each source, and a universal RMF + if one_rmf: + innermost_rads = Quantity([r_set[0] for r_set in radii], radii[0].unit) + outermost_rads = Quantity([r_set[-1] for r_set in radii], radii[0].unit) + evselect_spectrum(sources, outermost_rads, innermost_rads, group_spec, min_counts, min_sn, over_sample, + one_rmf, num_cores, disable_progress) + + # I want to be able to generate all the individual annuli in parallel, but I need them to be associated with + # the correct annuli, which is why I have to iterate through the sources and radii + + # These store the final output information needed to run the commands + all_cmds = [] + all_paths = [] + all_out_types = [] + all_extras = [] + # Iterating through the sources + for s_ind, source in enumerate(sources): + # This generates a random integer ID for this set of spectra + set_id = randint(0, 1e+8) + + # I want to be sure that this configuration doesn't already exist + if group_spec and min_counts is not None: + extra_name = "_mincnt{}".format(min_counts) + elif group_spec and min_sn is not None: + extra_name = "_minsn{}".format(min_sn) + else: + extra_name = '' + + # And if it was oversampled during generation then we need to include that as well + if over_sample is not None: + extra_name += "_ovsamp{ov}".format(ov=over_sample) + + # Combines the annular radii into a string + ann_rad_str = "_".join(source.convert_radius(radii[s_ind], 'deg').value.astype(str)) + spec_storage_name = "ra{ra}_dec{dec}_ar{ar}_grp{gr}" + spec_storage_name = spec_storage_name.format(ra=source.default_coord[0].value, + dec=source.default_coord[1].value, ar=ann_rad_str, gr=group_spec) + + spec_storage_name += extra_name + + exists = source.get_products('combined_spectrum', extra_key=spec_storage_name) + if len(exists) == 0: + # If it doesn't exist then we do need to call evselect_spectrum + generate_spec = True + else: + # If it already exists though we don't need to bother + generate_spec = False + + # This is where the commands/extra information get concatenated from the different annuli + src_cmds = np.array([]) + src_paths = np.array([]) + src_out_types = [] + src_extras = np.array([]) + if generate_spec or force_regen: + # Here we run through all the requested annuli for the current source + for r_ind in range(len(radii[s_ind])-1): + # Generate the SAS commands for the current annulus of the current source, for all observations + spec_cmd_out = _spec_cmds(source, radii[s_ind][r_ind+1], radii[s_ind][r_ind], group_spec, min_counts, + min_sn, over_sample, one_rmf, num_cores, disable_progress, True) + + # Read out some of the output into variables to be modified + interim_paths = spec_cmd_out[5][0] + interim_extras = spec_cmd_out[6][0] + interim_cmds = spec_cmd_out[0][0] + + # Modified paths and commands will be stored in here + new_paths = [] + new_cmds = [] + for p_ind, p in enumerate(interim_paths): + cur_cmd = interim_cmds[p_ind] + + # Split up the current path, so we only modify the actual file name and not any + # other part of the string + split_p = p.split('/') + # We add the set and annulus identifiers + new_spec = split_p[-1].replace("_spec.fits", "_ident{si}_{ai}".format(si=set_id, ai=r_ind)) \ + + "_spec.fits" + # Not enough just to change the name passed through XGA, it has to be changed in + # the SAS commands as well + cur_cmd = cur_cmd.replace(split_p[-1], new_spec) + + # Add the new filename back into the split spec file path + split_p[-1] = new_spec + + # Add an annulus identifier to the extra_info dictionary + interim_extras[p_ind].update({"set_ident": set_id, "ann_ident": r_ind}) + + # Only need to modify the RMF paths if the universal RMF HASN'T been used + if "universal" not in interim_extras[p_ind]['rmf_path']: + # Much the same process as with the spectrum name + split_r = copy(interim_extras[p_ind]['rmf_path']).split('/') + # split_br = copy(interim_extras[p_ind]['b_rmf_path']).split('/') + new_rmf = split_r[-1].replace('.rmf', "_ident{si}_{ai}".format(si=set_id, ai=r_ind)) + ".rmf" + # new_b_rmf = split_br[-1].replace('_back.rmf', "_ident{si}_{ai}".format(si=set_id, ai=r_ind)) \ + # + "_back.rmf" + + # Replacing the names in the SAS commands + cur_cmd = cur_cmd.replace(split_r[-1], new_rmf) + # cur_cmd = cur_cmd.replace(split_br[-1], new_b_rmf) + + split_r[-1] = new_rmf + # split_br[-1] = new_b_rmf + + # Adding the new RMF paths into the extra info dictionary + # interim_extras[p_ind].update({"rmf_path": "/".join(split_r), + # "b_rmf_path": "/".join(split_br)}) + interim_extras[p_ind].update({"rmf_path": "/".join(split_r)}) + + # Same process as RMFs but for the ARF, background ARF, and background spec + split_a = copy(interim_extras[p_ind]['arf_path']).split('/') + # split_ba = copy(interim_extras[p_ind]['b_arf_path']).split('/') + split_bs = copy(interim_extras[p_ind]['b_spec_path']).split('/') + new_arf = split_a[-1].replace('.arf', "_ident{si}_{ai}".format(si=set_id, ai=r_ind)) + ".arf" + # new_b_arf = split_ba[-1].replace('_back.arf', "_ident{si}_{ai}".format(si=set_id, ai=r_ind)) \ + # + "_back.arf" + new_b_spec = split_bs[-1].replace('_backspec.fits', "_ident{si}_{ai}".format(si=set_id, ai=r_ind)) \ + + "_backspec.fits" + + # New names into the commands + cur_cmd = cur_cmd.replace(split_a[-1], new_arf) + # cur_cmd = cur_cmd.replace(split_ba[-1], new_b_arf) + cur_cmd = cur_cmd.replace(split_bs[-1], new_b_spec) + + split_a[-1] = new_arf + # split_ba[-1] = new_b_arf + split_bs[-1] = new_b_spec + + # Update the extra info dictionary some more + # interim_extras[p_ind].update({"arf_path": "/".join(split_a), "b_arf_path": "/".join(split_ba), + # "b_spec_path": "/".join(split_bs)}) + interim_extras[p_ind].update({"arf_path": "/".join(split_a), "b_spec_path": "/".join(split_bs)}) + + # Add the new paths and commands to their respective lists + new_paths.append("/".join(split_p)) + new_cmds.append(cur_cmd) + + src_paths = np.concatenate([src_paths, new_paths]) + # Go through and concatenate things to the source lists defined above + src_cmds = np.concatenate([src_cmds, new_cmds]) + src_out_types += ['annular spectrum set components'] * len(spec_cmd_out[4][0]) + src_extras = np.concatenate([src_extras, interim_extras]) + src_out_types = np.array(src_out_types) + + # This adds the current sources final commands to the 'all sources' lists + all_cmds.append(src_cmds) + all_paths.append(src_paths) + all_out_types.append(src_out_types) + all_extras.append(src_extras) + + # This gets passed back to the sas call function and is used to run the commands + return all_cmds, False, True, num_cores, all_out_types, all_paths, all_extras, disable_progress + + +@sas_call +def cross_arf(sources: Union[BaseSource, BaseSample], radii: Union[List[Quantity], Quantity], + group_spec: bool = True, min_counts: int = 5, min_sn: float = None, over_sample: float = None, + set_id: str = None, detmap_bin: int = 200, num_cores: int = NUM_CORES, disable_progress: bool = False): + """ + This function will generate cross-arfs for annular spectra, which describe the contribution of each annulus + to each other annulus due to XMM's relatively sizeable PSF. The cross-arfs are generated for each instrument + of each observation, and automatically stored in their parent AnnularSpectra instance, both for retrieval by + fitting processes and so that the user can examine them with a plotting method and by retrieving effective + area values from them. + + :param BaseSource/BaseSample sources: A single source object, or a sample of sources. + :param List[Quantity]/Quantity radii: A list of non-scalar quantities containing the boundary radii of the + annuli for the sources. A single quantity containing at least three radii may be passed if one source + is being analysed, but for multiple sources there should be a quantity (with at least three radii), PER + source. This is used to help retrieve the correct annular spectrum. + :param bool group_spec: A boolean flag that sets whether the spectra are grouped or not. This is used to help + retrieve the correct annular spectrum. + :param float min_counts: If retrieving a grouped spectrum, this is the minimum number of counts per channel. + To disable minimum counts set this parameter to None. This is used to help retrieve the correct + annular spectrum. + :param float min_sn: If retrieving a grouped spectrum, this is the minimum signal to noise in each channel. + To disable minimum signal to noise set this parameter to None. This is used to help retrieve the correct + annular spectrum. + :param float over_sample: The minimum energy resolution for each group, set to None to disable. e.g. if + over_sample=3 then the minimum width of a group is 1/3 of the resolution FWHM at that energy. This is + used to help retrieve the correct annular spectrum. + :param str/List[str] set_id: The unique annular spectrum identifier (or a list of them if analysing multiple + sources) that specifies which annular spectrum to use. + :param int detmap_bin: The spatial binning applied to event lists to create the detector maps used in the + calculations of effective areas. The default is 200, smaller values will increase the resolution but will + cause dramatically slower calculations. + :param int num_cores: The number of cores to use, default is set to 90% of available. + :param bool disable_progress: Setting this to true will turn off the SAS generation progress bar. + :return: + """ + # If it's a single source I put it into an iterable object (i.e. a list), just for convenience + if isinstance(sources, BaseSource): + sources = [sources] + elif isinstance(sources, list) and not all([isinstance(s, BaseSource) for s in sources]): + raise TypeError("If a list is passed, each element must be a source.") + # And the only other option is a BaseSample instance, so if it isn't that then we get angry + elif not isinstance(sources, (BaseSample, list)): + raise TypeError("Please only pass source or sample objects for the 'sources' parameter of this function") + + # We want set_id to be iterable as well, so we wrap it in a list if it wasn't already a list or array + if set_id is not None and isinstance(set_id, (list, np.ndarray)): + set_id = [set_id] + # We still want it to be iterable even if the user never specified it, so we make a list of Nones the same + # length as the sources variable + elif set_id is None: + set_id = [None]*len(sources) + + # This will trigger if the user passed too few or too many set ids for the number of sources there are + if len(set_id) != len(sources): + raise ValueError("If an XGA sample has been passed, and AnnularSpectra are being specified with the 'set_id' " + "argument, then a list of set_ids with the same number of entries must be passed.") + + # NOTE - There is no ';' after {dmc} because it will be included in the dmc command, or not. This is because if the + # requested detmap already exists then the command will just be "", and that will make bash upset if there is + # a ";" after it. + arfgen_cmd = "cd {d}; cp ../ccf.cif .; export SAS_CCF={ccf}; {dmc} arfgen spectrumset={s} arfset={a} " \ + "withrmfset=yes rmfset={r} badpixlocation={e} extendedsource=yes detmaptype=dataset " \ + "detmaparray={ds} setbackscale=no badpixmaptype=dataset crossregionarf=yes " \ + "crossreg_spectrumset={crs}; mv * ../; cd ..; rm -r {d}" + + # These store the final output information needed to run the commands + all_cmds = [] + all_paths = [] + all_out_types = [] + all_extras = [] + for src_ind, src in enumerate(sources): + + # This is where the commands/extra information get concatenated from the different annuli + src_cmds = np.array([]) + src_paths = np.array([]) + src_out_types = [] + src_extras = np.array([]) + + try: + ann_spec = src.get_annular_spectra(radii, group_spec, min_counts, min_sn, over_sample, set_id[src_ind]) + except NoProductAvailableError: + # We make our own version of this error + raise NoProductAvailableError("The requested AnnularSpectra cannot be located for {sn}, and this function " + "will not automatically generate annular spectra.".format(sn=src.name)) + + oi_combos = [(o_id, inst) for o_id, insts in ann_spec.instruments.items() for inst in insts] + for oi in oi_combos: + rel_sp_comp = [ann_spec.get_spectra(ann_id, oi[0], oi[1]) for ann_id in ann_spec.annulus_ids] + + for sp_comb in permutations(rel_sp_comp, 2): + obs_id = sp_comb[0].obs_id + inst = sp_comb[0].instrument + evt_list = src.get_products('events', obs_id, inst)[0] + + dest_dir = OUTPUT + "{o}/{i}_{n}_temp_{r}/".format(o=obs_id, i=inst, n=src.name, r=randint(0, 1e+8)) + + if not os.path.exists(dest_dir): + os.makedirs(dest_dir) + + ccf = dest_dir + "ccf.cif" + + det_map_cmd, det_map_cmd_path, det_map_path = _gen_detmap_cmd(src, obs_id, inst, detmap_bin) + + c_arf_name = "{o}_{i}_{n}_".format(o=obs_id, i=inst, n=src.name) + \ + ann_spec.storage_key.split('_ar')[0] + '_grp' + \ + ann_spec.storage_key.split('ar')[-1].split('_grp')[1] + \ + "_ident" + str(ann_spec.set_ident) + \ + '_cross_{inn}_{out}.arf'.format(inn=sp_comb[0].annulus_ident, + out=sp_comb[1].annulus_ident) + c_arf_path = dest_dir + c_arf_name + + cmd = arfgen_cmd.format(d=dest_dir, ccf=ccf, s=sp_comb[0].path, a=c_arf_path, r=sp_comb[0].rmf, + e=evt_list.path, crs=sp_comb[1].path, ds=det_map_cmd_path, dmc=det_map_cmd) + + extra_info = {'detmap_bin': detmap_bin, + 'ann_spec_set_id': ann_spec.set_ident, + 'obs_id': obs_id, + 'inst': inst, + 'src_ann_id': sp_comb[0].annulus_ident, + 'cross_ann_id': sp_comb[1].annulus_ident} + + src_paths = np.concatenate([src_paths, [OUTPUT + "{o}/".format(o=obs_id) + c_arf_name]]) + # Go through and concatenate things to the source lists defined above + src_cmds = np.concatenate([src_cmds, [cmd]]) + src_out_types += ['cross arfs'] * len(src_cmds) + src_extras = np.concatenate([src_extras, [extra_info]]) + + # This adds the current sources final commands to the 'all sources' lists + all_cmds.append(src_cmds) + all_paths.append(src_paths) + all_out_types.append(src_out_types) + all_extras.append(src_extras) + + # This gets passed back to the sas call function and is used to run the commands + return all_cmds, False, True, num_cores, all_out_types, all_paths, all_extras, disable_progress +