Skip to content

Commit

Permalink
moving xga.sas to xga.generate
Browse files Browse the repository at this point in the history
  • Loading branch information
Jessica Pilling committed Aug 11, 2023
1 parent 8d2dc42 commit 3721662
Show file tree
Hide file tree
Showing 6 changed files with 1,941 additions and 0 deletions.
9 changes: 9 additions & 0 deletions xga/generate/sas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS).
# Last modified by David J Turner (turne540@msu.edu) 09/05/2023, 11:33. Copyright (c) The Contributors

from ._common import region_setup
from .phot import *
from .spec import *



200 changes: 200 additions & 0 deletions xga/generate/sas/_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS).
# Last modified by David J Turner (turne540@msu.edu) 09/05/2023, 17:11. Copyright (c) The Contributors
import os.path
import warnings
from typing import Union, Tuple, List

from astropy.units import Quantity

from xga.exceptions import NotAssociatedError
from xga.samples import BaseSample
from xga.sources import BaseSource, NullSource, GalaxyCluster
from xga.utils import RAD_LABELS, OUTPUT


def region_setup(sources: Union[BaseSource, BaseSample], outer_radius: Union[str, Quantity],
inner_radius: Union[str, Quantity], disable_progress: bool, obs_id: str) \
-> Tuple[Union[BaseSource, BaseSample], List[Quantity], List[Quantity]]:
"""
The preparation and value checking stage for SAS spectrum generation.
:param BaseSource/BaseSample sources: A single source object, or a sample of sources.
:param str/Quantity outer_radius: The name or value of the outer radius to use for the generation of
the spectrum (for instance 'r200' would be acceptable for a GalaxyCluster, or Quantity(1000, 'kpc')).
:param str/Quantity inner_radius: The name or value of the inner radius to use for the generation of
the spectrum (for instance 'r500' would be acceptable for a GalaxyCluster, or Quantity(300, 'kpc')). By
default this is zero arcseconds, resulting in a circular spectrum.
:param bool disable_progress: Setting this to true will turn off the SAS generation progress bar.
:param str obs_id: Only used if the 'region' radius name is passed, the ObsID to retrieve the region for.
:return: The source objects, a list of inner radius quantities, and a list of outer radius quantities.
:rtype: Tuple[Union[BaseSource, BaseSample], List[Quantity], List[Quantity]]
"""
# NullSources are not allowed to have spectra, as they can have any observations associated and thus won't
# necessarily overlap
if isinstance(sources, NullSource):
raise TypeError("You cannot create spectra of a NullSource")

if isinstance(sources, BaseSource):
sources = [sources]

# Checking that the user hasn't passed BaseSources
if not all([type(src) != BaseSource for src in sources]):
raise TypeError("You cannot generate spectra from a BaseSource object, really you shouldn't be using "
"them at all, they are mostly useful as a superclass.")

# Issuing a warning to the user that one or one sources have not been detected
if not all([src.detected for src in sources]):
warnings.warn("Not all of these sources have been detected, the spectra generated may not be helpful.")

# Checking that inner radii that have been passed into the spectrum generation aren't nonsense
if isinstance(inner_radius, str) and inner_radius not in RAD_LABELS:
raise ValueError("You have passed a radius name rather than a value for 'inner_radius', but it is "
"not a valid name, please use one of the following:\n {}".format(", ".join(RAD_LABELS)))

elif isinstance(inner_radius, str) and inner_radius in ["r2500", "r500", "r200"] and \
not all([type(src) == GalaxyCluster for src in sources]):
raise TypeError("The {} radius is only valid for GalaxyCluster objects".format(inner_radius))

# One radius can be passed for a whole sample, but this checks to make sure that if there are multiple sources,
# and multiple radii have been passed, there are the same number of sources and radii
elif isinstance(inner_radius, Quantity) and len(sources) != 1 and not inner_radius.isscalar \
and len(sources) != len(inner_radius):
raise ValueError("Your sample has {s} sources, but your inner_radius variable only has {i} entries. Please "
"pass only one inner_radius or the same number as there are "
"sources".format(s=len(sources), i=len(inner_radius)))

# Checking that outer_radius radii that have been passed into the spectrum generation aren't nonsense
if isinstance(outer_radius, str) and outer_radius not in RAD_LABELS:
raise ValueError("You have passed a radius name rather than a value for 'outer_radius', but it is "
"not a valid name, please use one of the following:\n {}".format(", ".join(RAD_LABELS)))
elif isinstance(outer_radius, str) and outer_radius in ["r2500", "r500", "r200"] and \
not all([type(src) == GalaxyCluster for src in sources]):
raise TypeError("The {} radius is only valid for GalaxyCluster objects".format(outer_radius))
elif isinstance(outer_radius, Quantity) and len(sources) != 1 and not outer_radius.isscalar \
and len(sources) != len(outer_radius):
raise ValueError("Your sample has {s} sources, but your outer_radius variable only has {o} entries. Please "
"pass only one outer_radius or the same number as there are "
"sources".format(s=len(sources), o=len(outer_radius)))

# A crude way to store the radii but I'm tired and this will work fine
final_inner = []
final_outer = []
# I need to convert the radii to the same units and compare them, and to make sure they
# are actually in distance units. The distance unit checking is done by convert_radius
for s_ind, src in enumerate(sources):
# Converts the inner and outer radius for this source into the same unit
if isinstance(outer_radius, str) and outer_radius != 'region':
cur_out_rad = src.get_radius(outer_radius, 'deg')
elif isinstance(outer_radius, str) and outer_radius == 'region':
reg = src.source_back_regions('region', obs_id)[0]
cur_out_rad = Quantity([reg.width.to('deg').value/2, reg.height.to('deg').value/2], 'deg')
elif outer_radius.isscalar:
cur_out_rad = src.convert_radius(outer_radius, 'deg')
else:
cur_out_rad = src.convert_radius(outer_radius[s_ind], 'deg')

# We need to check that the outer radius isn't region, because for region objects we ignore whatever
# inner radius has been passed and just set it 0
if outer_radius == 'region':
cur_inn_rad = Quantity([0, 0], 'deg')
elif isinstance(inner_radius, str):
cur_inn_rad = src.get_radius(inner_radius, 'deg')
elif inner_radius.isscalar:
cur_inn_rad = src.convert_radius(inner_radius, 'deg')
else:
cur_inn_rad = src.convert_radius(inner_radius[s_ind], 'deg')

# Then we can check to make sure that the outer radius is larger than the inner radius
if outer_radius != 'region' and cur_inn_rad >= cur_out_rad:
raise ValueError("The inner_radius of {s} is greater than or equal to the outer_radius".format(s=src.name))
else:
final_inner.append(cur_inn_rad)
final_outer.append(cur_out_rad)

return sources, final_inner, final_outer


def _gen_detmap_cmd(source: BaseSource, obs_id: str, inst: str, bin_size: int = 200) -> Tuple[str, str, str]:
"""
:param BaseSource source: The source for which the parent method is generating ARFs for, and that needs
a detector map.
:param str obs_id: The ObsID of the data we are generating ARFs for.
:param str inst: The instrument of the data we are generating ARFs for. NOTE - ideally this instrument WILL NOT
be used for the detector map, as it is beneficial to source a detector map from a different instrument to
the one you are generating ARFs for.
:param int bin_size: The x and y binning that should be applied to the image. Larger numbers will cause ARF
generation to be faster, but arguably the results will be less accurate.
:return: The command to generate the requested detector map (will be blank if the detector map already
exists), the path where the detmap will be after the command is run (i.e. the ObsID directory if it was
already generated, or the temporary directory if it has just been generated), and the final output path
of the detector.
:rtype: Tuple[str, str, str]
"""
# This is the command that will be filled out to generate the detmap of our dreams!
detmap_cmd = "evselect table={e} imageset={d} xcolumn=DETX ycolumn=DETY imagebinning=binSize ximagebinsize={bs} " \
"yimagebinsize={bs} {ex}"

# Some settings depend on the instrument, we use different patterns for different instruments
if "pn" in inst:
# Also the upper channel limit is different for EPN and EMOS detectors
spec_lim = 20479
# This is an expression without region information to be used for making the detmaps
# required for ARF generation, we start off assuming we'll use a MOS observation as the detmap
d_expr = "expression='#XMMEA_EM && (PATTERN <= 12) && (FLAG .eq. 0)'"

# The detmap for the arfgen call should ideally not be from the same instrument as the observation,
# so for PN we preferentially select MOS2 (as MOS1 was damaged). However if there isn't a MOS2
# events list from the same observation then we select MOS1, and failing that we use PN.
try:
detmap_evts = source.get_products("events", obs_id=obs_id, inst='mos2')[0]
except NotAssociatedError:
try:
detmap_evts = source.get_products("events", obs_id=obs_id, inst='mos1')[0]
except NotAssociatedError:
detmap_evts = source.get_products("events", obs_id=obs_id, inst='pn')[0]
# If all is lost and there are no MOS event lists then we must revert to the PN expression
d_expr = "expression='#XMMEA_EP && (PATTERN <= 4) && (FLAG .eq. 0)'"

elif "mos" in inst:
spec_lim = 11999
# This is an expression without region information to be used for making the detmaps
# required for ARF generation, we start off assuming we'll use the PN observation as the detmap
d_expr = "expression='#XMMEA_EP && (PATTERN <= 4) && (FLAG .eq. 0)'"

# The detmap for the arfgen call should ideally not be from the same instrument as the observation,
# so for MOS observations we preferentially select PN. However if there isn't a PN events list
# from the same observation then for MOS2 we select MOS1, and for MOS1 we select MOS2 (as they
# are rotated wrt one another it is still semi-valid), and failing that MOS2 will get MOS2 and MOS1
# will get MOS1.
if inst[-1] == 1:
cur = '1'
opp = '2'
else:
cur = '2'
opp = '1'
try:
detmap_evts = source.get_products("events", obs_id=obs_id, inst='pn')[0]
except NotAssociatedError:
# If we must use a MOS detmap then we have to use the MOS expression
d_expr = "expression='#XMMEA_EM && (PATTERN <= 12) && (FLAG .eq. 0)'"
try:
detmap_evts = source.get_products("events", obs_id=obs_id, inst='mos' + opp)[0]
except NotAssociatedError:
detmap_evts = source.get_products("events", obs_id=obs_id, inst='mos' + cur)[0]
else:
raise ValueError("You somehow have an illegal value for the instrument name...")

det_map = "{o}_{i}_bin{bs}_detmap.fits".format(o=detmap_evts.obs_id, i=detmap_evts.instrument, bs=bin_size)
det_map_path = os.path.join(OUTPUT, obs_id, det_map)

# If the detmap that we've decided we need already exists, then we don't need to generate it again
if os.path.exists(det_map_path):
d_cmd_str = ""
det_map_cmd_path = det_map_path
else:
# Does the same thing for the evselect command to make the detmap
d_cmd_str = detmap_cmd.format(e=detmap_evts.path, d=det_map, ex=d_expr, bs=bin_size) + "; "
det_map_cmd_path = det_map

return d_cmd_str, det_map_cmd_path, det_map_path
93 changes: 93 additions & 0 deletions xga/generate/sas/misc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS).
# Last modified by David J Turner (turne540@msu.edu) 20/02/2023, 14:04. Copyright (c) The Contributors

import os
from random import randint
from typing import Union

import numpy as np
from fitsio import read_header

from .run import sas_call
from .. import OUTPUT, NUM_CORES
from ..exceptions import InvalidProductError
from ..samples.base import BaseSample
from ..sources import BaseSource
from ..sources.base import NullSource


@sas_call
def cifbuild(sources: Union[BaseSource, NullSource, BaseSample], num_cores: int = NUM_CORES,
disable_progress: bool = False):
"""
A wrapper for the XMM cifbuild command, which will be run before many of the more complex SAS commands, to
check that a CIF compatible with the local version of SAS is available. The observation date is taken from an
event list for a given ObsID, and the analysis date is set to the date which this function is run.
:param BaseSource/NullSource/BaseSample sources: A single source object, or a sample of sources.
:param int num_cores: The number of cores to use (if running locally), default is set to
90% of available.
:param bool disable_progress: Setting this to true will turn off the SAS generation progress bar.
"""
# This function supports passing both individual sources and sets of sources
if isinstance(sources, (BaseSource, NullSource)):
sources = [sources]

# This string contains the bash code to run cifbuild
cif_cmd = "cd {d}; cifbuild calindexset=ccf.cif withobservationdate=yes " \
"observationdate={od} ; mv * ../; cd ..; rm -r {n}"

sources_cmds = []
sources_paths = []
sources_extras = []
sources_types = []
for source in sources:
cmds = []
final_paths = []
extra_info = []
for obs_id in source.obs_ids:
# Fetch an events list for this ObsID, doesn't matter which
some_evt_lists = source.get_products("events", obs_id=obs_id)
obs_date = None
for evt in some_evt_lists:
# Reads in the header of the events list file
evt_head = read_header(evt.path)
# Then extracts the observation date, this is what we need to give cifbuild
if "DATE-OBS" in evt_head:
obs_date = evt_head['DATE-OBS']
del evt_head
break
else:
del evt_head

if obs_date is None:
raise InvalidProductError("All event lists for {} are missing the DATE-OBS header, this is required to"
" run the cifbuild function.".format(obs_id))

if not os.path.exists(OUTPUT + obs_id):
os.mkdir(OUTPUT + obs_id)

dest_dir = "{out}{obs}/".format(out=OUTPUT, obs=obs_id)
temp_name = "tempdir_{}".format(randint(0, 1e+8))
temp_dir = dest_dir + temp_name + "/"

final_path = dest_dir + "ccf.cif"
if not os.path.exists(final_path):
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
cmds.append(cif_cmd.format(d=temp_dir, od=obs_date, n=temp_name))
final_paths.append(final_path)
extra_info.append({}) # This doesn't need any extra information

sources_cmds.append(np.array(cmds))
sources_paths.append(np.array(final_paths))
sources_extras.append(np.array(extra_info))
sources_types.append(np.full(sources_cmds[-1].shape, fill_value="ccf"))

stack = False # This tells the sas_call routine that this command won't be part of a stack
execute = True # This should be executed immediately

return sources_cmds, stack, execute, num_cores, sources_types, sources_paths, sources_extras, disable_progress



Loading

0 comments on commit 3721662

Please sign in to comment.