Skip to content

Commit

Permalink
Made a start on the new simple match function that will use any and a…
Browse files Browse the repository at this point in the history
…ll telescopes (and will also be called separation match because that is way more informative)
  • Loading branch information
DavidT3 committed Sep 22, 2023
1 parent ae5222c commit c9fb519
Showing 1 changed file with 155 additions and 3 deletions.
158 changes: 155 additions & 3 deletions xga/sourcetools/match.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# This code is a part of X-ray: Generate and Analyse (XGA), a module designed for the XMM Cluster Survey (XCS).
# Last modified by David J Turner (turne540@msu.edu) 22/09/2023, 15:27. Copyright (c) The Contributors
# Last modified by David J Turner (turne540@msu.edu) 22/09/2023, 17:01. Copyright (c) The Contributors
import gc
import os
from copy import deepcopy
from multiprocessing import Pool
from typing import Union, Tuple, List
from warnings import warn

import numpy as np
import pandas as pd
Expand All @@ -15,8 +16,9 @@
from regions import read_ds9, PixelRegion, SkyRegion
from tqdm import tqdm

from .. import CENSUS, BLACKLIST, NUM_CORES, OUTPUT, xga_conf
from ..exceptions import NoMatchFoundError, NoValidObservationsError, NoRegionsError, XGAConfigError
from .. import CENSUS, BLACKLIST, NUM_CORES, OUTPUT, xga_conf, TELESCOPES, USABLE
from ..exceptions import NoMatchFoundError, NoValidObservationsError, NoRegionsError, XGAConfigError, \
NoTelescopeDataError, InvalidTelescopeError
from ..utils import SRC_REGION_COLOURS


Expand Down Expand Up @@ -314,6 +316,156 @@ def _in_region(ra: Union[float, List[float], np.ndarray], dec: Union[float, List
return obs_id, matched


# TODO rejig the docstring, mention the new behaviour with default search distances
def seperation_match(src_ra: Union[float, np.ndarray], src_dec: Union[float, np.ndarray],
distance: Union[Quantity, dict] = None, telescope: Union[str, list] = None,
num_cores: int = NUM_CORES) \
-> Tuple[Union[DataFrame, List[DataFrame]], Union[DataFrame, List[DataFrame]]]:
"""
Returns ObsIDs within a given distance from the input ra and dec values.
:param float/np.ndarray src_ra: RA coordinate(s) of the source(s), in degrees. To find matches for multiple
coordinate pairs, pass an array.
:param float/np.ndarray src_dec: DEC coordinate(s) of the source(s), in degrees. To find matches for multiple
coordinate pairs, pass an array.
:param Quantity distance: The distance to search for XMM observations within, default should be
able to match a source on the edge of an observation to the centre of the observation.
:param int num_cores: The number of cores to use, default is set to 90% of system cores. This is only relevant
if multiple coordinate pairs are passed.
:return: A dataframe containing ObsID, RA_PNT, and DEC_PNT of matching XMM observations, and a dataframe
containing information on observations that would have been a match, but that are in the blacklist.
:rtype: Tuple[Union[DataFrame, List[DataFrame]], Union[DataFrame, List[DataFrame]]]
"""

# If the telescope is set to None then the function will search all the telescopes for matching data, determining
# which are available by grabbing all the keys from the CENSUS dictionary
if telescope is None:
telescope = list(CENSUS.keys())
# If there are no keys in the CENSUS dictionary then we know that no telescopes have been successfully
# setup with XGA and this function isn't going to work
if len(telescope) == 0:
raise NoTelescopeDataError("No telescope data is currently available to XGA.")

else:
# Just making sure the telescope names supplied by the user are lowercase, as that is how they are stored in
# XGA constants and products - also have to account for the fact that either a single string or a list
# can be passed
if not isinstance(telescope, list):
telescope = [telescope.lower()]
else:
telescope = [t.lower() for t in telescope]

# Here we check if ANY of the passed telescopes aren't actually recognised by XGA, as I want to tell them
# that they have either made a typo or are labouring under a misconception about which telescopes are
# supported
if any([t not in TELESCOPES for t in telescope]):
which_bad = [t for t in telescope if t not in TELESCOPES]
raise InvalidTelescopeError("XGA does not support the following telescopes; "
"{}".format(', '.join(which_bad)))
# If the user made specific requests of telescope, and they are ALL not available, we throw an error
elif all([not USABLE[t] for t in telescope]):
raise NoTelescopeDataError("None of the requested telescopes ({}) have data available to "
"XGA.".format(', '.join(telescope)))
# However if the user made specific requests of telescope, and SOME are not available then they get a warning
elif any([not USABLE[t] for t in telescope]):
# This isn't elegant, but oh well - we have to make sure that we only let those telescopes through
# that have actually been set up and are working with XGA
which_bad = [t for t in telescope if not USABLE[t]]
telescope = [t for t in telescope if USABLE[t]]
warn("Some requested telescopes are not currently set up with XGA; {}".format(", ".join(which_bad)),
stacklevel=2)

# Extract the search distance as a float, specifically in degrees
rad = distance.to('deg').value

# Here we perform a check to see whether a set of coordinates is being passed, and if so are the two
# arrays the same length
if isinstance(src_ra, np.ndarray) and isinstance(src_dec, np.ndarray) and len(src_ra) != len(src_dec):
raise ValueError("If passing multiple pairs of coordinates, src_ra and src_dec must be of the same length.")
# Just one coordinate is also allowed, but still want it to be iterable so put it in an array
elif isinstance(src_ra, float) and isinstance(src_dec, float):
src_ra = np.array([src_ra])
src_dec = np.array([src_dec])
num_cores = 1
# Don't want one input being a single number and one being an array
elif type(src_ra) != type(src_dec):
raise TypeError("src_ra and src_dec must be the same type, either both floats or both arrays.")

# The prog_dis variable controls whether the tqdm progress bar is displayed or not, don't want it to be there
# for single coordinate pairs
if len(src_ra) != 1:
prog_dis = False
else:
prog_dis = True

# The dictionary stores match dataframe information, with the keys comprised of the str(ra)+str(dec)
c_matches = {}
# This dictionary stores any ObsIDs that were COMPLETELY blacklisted (i.e. all instruments were excluded) for
# a given coordinate. So they were initially found as being nearby, but then completely removed
fully_blacklisted = {}

# This helps keep track of the original coordinate order, so we can return information in the same order it
# was passed in
order_list = []
# If we only want to use one core, we don't set up a pool as it could be that a pool is open where
# this function is being called from
if num_cores == 1:
# Set up the tqdm instance in a with environment
with tqdm(desc='Searching for observations near source coordinates', total=len(src_ra),
disable=prog_dis) as onwards:
# Simple enough, just iterates through the RAs and Decs calling the search function and stores the
# results in the dictionary
for ra_ind, r in enumerate(src_ra):
d = src_dec[ra_ind]
search_results = _simple_search(r, d, rad)
c_matches[repr(r) + repr(d)] = search_results[2]
fully_blacklisted[repr(r) + repr(d)] = search_results[3]
order_list.append(repr(r)+repr(d))
onwards.update(1)
else:
# This is all equivalent to what's above, but with function calls added to the multiprocessing pool
with tqdm(desc="Searching for observations near source coordinates", total=len(src_ra)) as onwards, \
Pool(num_cores) as pool:
def match_loop_callback(match_info):
nonlocal onwards # The progress bar will need updating
nonlocal c_matches
c_matches[repr(match_info[0]) + repr(match_info[1])] = match_info[2]
fully_blacklisted[repr(match_info[0]) + repr(match_info[1])] = match_info[3]

onwards.update(1)

for ra_ind, r in enumerate(src_ra):
d = src_dec[ra_ind]
order_list.append(repr(r)+repr(d))
pool.apply_async(_simple_search, args=(r, d, rad), callback=match_loop_callback)

pool.close() # No more tasks can be added to the pool
pool.join() # Joins the pool, the code will only move on once the pool is empty.

# Changes the order of the results to the original pass in order and stores them in a list
results = [c_matches[n] for n in order_list]
bl_results = [fully_blacklisted[n] for n in order_list]
del c_matches
del fully_blacklisted

# Result length of one means one coordinate was passed in, so we should pass back out a single dataframe
# rather than a single dataframe in a list
if len(results) == 1:
results = results[0]
bl_results = bl_results[0]

# Checks whether the dataframe inside the single result is length zero, if so then there are no relevant ObsIDs
if len(results) == 0:
raise NoMatchFoundError("No XMM observation found within {a} of ra={r} "
"dec={d}".format(r=round(src_ra[0], 4), d=round(src_dec[0], 4), a=distance))
# If all the dataframes in the results list are length zero, then none of the coordinates has a
# valid ObsID
elif all([len(r) == 0 for r in results]):
raise NoMatchFoundError("No XMM observation found within {a} of any input coordinate pairs".format(a=distance))

return results, bl_results


def simple_xmm_match(src_ra: Union[float, np.ndarray], src_dec: Union[float, np.ndarray],
distance: Quantity = Quantity(30.0, 'arcmin'), num_cores: int = NUM_CORES) \
-> Tuple[Union[DataFrame, List[DataFrame]], Union[DataFrame, List[DataFrame]]]:
Expand Down

0 comments on commit c9fb519

Please sign in to comment.