diff --git a/punchbowl/level3/polarization.py b/punchbowl/level3/polarization.py index 14824a89..af268920 100644 --- a/punchbowl/level3/polarization.py +++ b/punchbowl/level3/polarization.py @@ -19,9 +19,9 @@ def convert_polarization( NDCube(data=input_data[i].data, wcs=input_data.wcs.dropaxis(2), meta={"POLAR": angle})) - for (label, i, angle) in zip(["M", "Z", "P"], - [0, 1, 2], - [-60*u.deg, 0*u.deg, 60*u.deg], strict=False)] + for (label, i, angle) in zip(["M", "Z", "P"], + [0, 1, 2], + [-60 * u.deg, 0 * u.deg, 60 * u.deg], strict=False)] data_collection = NDCollection(collection_contents, aligned_axes="all") resolved_data_collection = resolve(data_collection, "BpB", imax_effect=False) diff --git a/punchbowl/level3/stellar.py b/punchbowl/level3/stellar.py index 75ef91ed..f4e1fa37 100644 --- a/punchbowl/level3/stellar.py +++ b/punchbowl/level3/stellar.py @@ -2,13 +2,15 @@ import numpy as np import remove_starfield -from ndcube import NDCube +import astropy.units as u +from solpolpy import resolve +from ndcube import NDCube, NDCollection from prefect import flow, get_run_logger from remove_starfield import ImageHolder, ImageProcessor, Starfield from remove_starfield.reducers import GaussianReducer from punchbowl.data import NormalizedMetadata, load_ndcube_from_fits -from punchbowl.data.wcs import calculate_celestial_wcs_from_helio, calculate_helio_wcs_from_celestial +from punchbowl.data.wcs import calculate_celestial_wcs_from_helio, calculate_helio_wcs_from_celestial, get_p_angle from punchbowl.prefect import punch_task @@ -24,7 +26,83 @@ def load_image(self, filename: str) -> ImageHolder: cube = load_ndcube_from_fits(filename, key="A") return ImageHolder(cube.data[self.layer], cube.wcs.celestial, cube.meta) +def to_celestial(input_data: NDCube) -> NDCube: + """ + Convert polarization from mzpsolar to Celestial frame. + + All images need their polarization converted to Celestial frame + to generated the background starfield model. + """ + # Create a data collection for M, Z, P components + mzp_angles = [-60, 0, 60]*u.degree + + # Compute new angles for celestial frame + cel_north_offset = get_p_angle(time=input_data[0].meta["DATE-OBS"].value) + new_angles = mzp_angles - cel_north_offset + + collection_contents = [ + (label, + NDCube(data=input_data[i].data, + wcs=input_data.wcs.dropaxis(2), + meta={"POLAR": angle})) + for label, i, angle in zip(["M", "Z", "P"], [0, 1, 2], mzp_angles, strict=False) + ] + data_collection = NDCollection(collection_contents, aligned_axes="all") + + # Resolve data to celestial frame + celestial_data_collection = resolve(data_collection, "npol", out_angles=new_angles, imax_effect=False) + + valid_keys = [key for key in celestial_data_collection if key != "alpha"] + new_data = [celestial_data_collection[key].data for key in valid_keys] + new_wcs = input_data.wcs.copy() + + output_meta = NormalizedMetadata.load_template("PTM", "3") + output_meta["DATE-OBS"] = input_data.meta["DATE-OBS"].value + + output = NDCube(data=new_data, wcs=new_wcs, meta=output_meta) + output.meta.history.add_now("LEVEL3-convert2celestial", "Convert mzpsolar to Celestial") + + return output + + +def from_celestial(input_data: NDCube) -> NDCube: + """ + Convert polarization from Celestial frame to mzpsolar. + + All images need their polarization converted back to Solar frame + after removing the stellar polarization. + """ + # Create a data collection for M, Z, P components + mzp_angles = [-60, 0, 60]*u.degree + # Compute new angles for celestial frame + cel_north_offset = get_p_angle(time=input_data[0].meta["DATE-OBS"].value) + new_angles = mzp_angles - cel_north_offset + collection_contents = [ + (f"{angle.value} deg", + NDCube(data=input_data[i].data, + wcs=input_data.wcs.dropaxis(2), + meta={"POLAR": angle})) + for i, angle in enumerate(new_angles) + ] + data_collection = NDCollection(collection_contents, aligned_axes="all") + + # Resolve data to mzpsolar frame + solar_data_collection = resolve(data_collection, "mzpsolar", imax_effect=False) + + valid_keys = [key for key in solar_data_collection if key != "alpha"] + new_data = [solar_data_collection[key].data for key in valid_keys] + new_wcs = input_data.wcs.copy() + + output_meta = NormalizedMetadata.load_template("PTM", "2") + output_meta["DATE-OBS"] = input_data.meta["DATE-OBS"].value + + output = NDCube(data=new_data, wcs=new_wcs, meta=output_meta) + output.meta.history.add_now("LEVEL3-convert2mzpsolar", "Convert Celestial to mzpsolar") + + return output + +# TODO: Use to_celestial() before generating starfield @flow(log_prints=True) def generate_starfield_background( filenames: list[str], @@ -162,7 +240,7 @@ def subtract_starfield_background_task(data_object: NDCube, return output - +#TODO: Pass the outputs through from_celestial() to convert everything back to mzpsolar def create_empty_starfield_background(data_object: NDCube) -> np.ndarray: """Create an empty starfield background map.""" return np.zeros_like(data_object.data) diff --git a/punchbowl/level3/tests/test_stellar.py b/punchbowl/level3/tests/test_stellar.py index 7fa9bea8..4e41ef1f 100644 --- a/punchbowl/level3/tests/test_stellar.py +++ b/punchbowl/level3/tests/test_stellar.py @@ -1,12 +1,17 @@ import numpy as np import pytest +import astropy.units as u from astropy.nddata import StdDevUncertainty from astropy.wcs import WCS from ndcube import NDCube from remove_starfield import Starfield - +from prefect import flow, get_run_logger from punchbowl.data import NormalizedMetadata -from punchbowl.level3.stellar import generate_starfield_background, subtract_starfield_background_task +from datetime import datetime +from punchbowl.level3.stellar import (generate_starfield_background, + subtract_starfield_background_task, + from_celestial, to_celestial) +from punchbowl.data.wcs import calculate_celestial_wcs_from_helio, calculate_helio_wcs_from_celestial, get_p_angle @pytest.fixture() @@ -44,7 +49,48 @@ def zero_starfield_data(shape: tuple = (256, 256)) -> Starfield: return Starfield(starfield=starfield, wcs=wcs) -# + +@pytest.fixture +def sample_ndcube() -> NDCube: + def _sample_ndcube(shape: tuple, code: str = "PTM", level: str = "2") -> NDCube: + data = np.ones(shape).astype(np.float32) + sqrt_abs_data = np.sqrt(np.abs(data)) + uncertainty = StdDevUncertainty(np.interp(sqrt_abs_data, (sqrt_abs_data.min(), sqrt_abs_data.max()), + (0, 1)).astype(np.float32)) + wcs = WCS(naxis=3) + wcs.ctype = "WAVE", "HPLT-TAN", "HPLN-TAN" + wcs.cdelt = 0.2, 0.1, 0.1 + wcs.cunit = "Angstrom", "deg", "deg" + wcs.crpix = 0, 0, 0 + wcs.crval = 5, 1, 1 + + meta = NormalizedMetadata.load_template(code, level) + meta["DATE-OBS"] = str(datetime(2024, 3, 21, 00, 00, 00)) + meta["FILEVRSN"] = "1" + return NDCube(data=data, uncertainty=uncertainty, wcs=wcs, meta=meta) + + return _sample_ndcube + + +def test_from_celestial(sample_ndcube) -> NDCube: + test_cube = sample_ndcube((3, 10, 10)) + data_solar = from_celestial(test_cube) + expected_solar = np.full((3, 10, 10), 1.0, dtype=np.float32) + assert isinstance(data_solar, NDCube) + assert np.allclose(data_solar.data, expected_solar) + +def test_to_celestial(sample_ndcube) -> NDCube: + test_cube = sample_ndcube((3, 10, 10)) + data_cel = to_celestial(test_cube) + expected_cel = np.full((3, 10, 10), 1.0, dtype=np.float32) + assert isinstance(data_cel, NDCube) + assert np.allclose(data_cel.data, expected_cel) + + + # for i in enumerate(angles): + # assert np.allclose(angles[i], test_cube[i].meta["POLAR"]*u.degree) + + # def test_basic_subtraction(one_data: NDCube, zero_starfield_data: Starfield) -> None: # """ #