diff --git a/corkit/lasco.py b/corkit/lasco.py index 56c75c4..7f2322a 100644 --- a/corkit/lasco.py +++ b/corkit/lasco.py @@ -31,10 +31,10 @@ """Pypi dependencies""" from astropy.visualization import HistEqStretch, ImageNormalize +from typing import Union, Dict, Tuple, List, Optional from datetime import datetime, timedelta import matplotlib.pyplot as plt from astropy.io import fits -from typing import Union, Dict, Tuple from io import BytesIO import numpy as np import matplotlib @@ -44,6 +44,7 @@ import copy import glob import os +from icecream import ic from . import __version__ @@ -54,78 +55,145 @@ # LASCO # ############## # done -def level_1(fits_file: Union[str, BytesIO], target_path: str = None, **kwargs) -> None: - filetype: str = target_path.split(".")[-1] +def level_1( + fits_files: Union[str, BytesIO, List[Union[str, BytesIO]]], + target_path: Optional[str] = None, + format: Optional[str] = "fits", + *args, + **kwargs, +) -> None: + assert format in [ + "fits", + "fts", + "jpg2", + "png", + ], "Must be fits, fts, jpg2 or png filetype" + os.makedirs(target_path, exist_ok=True) + if target_path is not None: + assert format is not None, "Must define the extension of the file" # Import data - print("Importing data") - img0, header = FITS(fits_file) - if check_05(header): - pass - else: + if isinstance(fits_files, (BytesIO, str)): + print("Importing data") + try: + img0, header = FITS(fits_files) + filename = fits_files.split("/")[-1][:-4] + except OSError: + print("Corrupted file, try downloading again. Deleting...") + os.remove(fits_files) + if check_05(header): + pass + else: + print( + "File is already level 1.(Implemented just for CorKit derived fits files.)" + ) + return img0, header + if img0 is None: + print("FITS file doesn't contain image data, avorting and deleting...") + os.remove(fits_files) + return None, None + detector: str = header["detector"].strip().upper() + header.add_history( + f"corkit/lasco.py level_1: (function) {__version__}, 12/04/24" + ) + if "detector" in kwargs: + assert ( + kwargs["detector"].upper() in ["C2", "C3"] + and detector == kwargs["detector"].upper() + ), f'Not valid detector or file is not from LASCO {kwargs["detector"].upper()}, make sure all files in fits_files parameters are from the same detector and from LASCO.' + # Applying pixelwise implemented corrections + xsumming: float = np.maximum(header["sumcol"], 1) * np.maximum( + header["lebxsum"], 1 + ) + ysumming: float = np.maximum(header["sumrow"], 1) * np.maximum( + header["lebysum"], 1 + ) + summing: float = xsumming * ysumming + if summing > 1: + img0: np.array = fixwrap(img0) + dofull: bool = False + else: + dofull: bool = True + if ( + header["r2col"] + - header["r1col"] + + header["r2row"] + - header["r1row"] + - 1023 + - 1023 + != 0 + ): + img0: np.array = reduce_std_size(img0, header, FULL=dofull) + print( - "File is already level 1.(Implemented just for CorKit derived fits files.)" + f'LASCO-{header["detector"]}:{header["filename"]}:{header["date-obs"]}T{header["time-obs"]}...' + ) + match detector: + case "C2": + b, header = c2_calibrate(img0, header, **kwargs) + b, header = c2_warp(b, header) + zz: np.array = np.where(img0 <= 0) + maskall: np.array = np.ones((header["naxis1"], header["naxis2"])) + maskall[zz] = 0 + maskall, _ = c2_warp(maskall, header) + b *= maskall + case "C3": + b, header = c3_calibrate(img0, header, *args, **kwargs) + bn, header = c3_warp(b, header) + zz: np.array = np.where(img0 <= 0) + maskall: np.array = np.ones((header["naxis1"], header["naxis2"])) + maskall[zz] = 0 + maskallw, _ = c3_warp(maskall, header) + b = bn * maskallw * correct_var(header, args[1])[0] + + img, header = final_step( + target_path, format, b, header, xsumming, ysumming, **kwargs ) - if img0 is None: - print("FITS file doesn't contain image data, avorting and deleting...") - os.remove(fits_file) - return np.ones((1024, 1024)), {"Not valid"} - detector: str = header["detector"].strip().upper() - header.add_history(f"corkit/lasco.py level_1: (function) {__version__}, 12/04/24") - assert detector in [ - "C2", - "C3", - ], f"Not valid detector, not implemented for {detector}" - # Applying pixelwise implemented corrections - xsumming: float = np.maximum(header["sumcol"], 1) * np.maximum(header["lebxsum"], 1) - ysumming: float = np.maximum(header["sumrow"], 1) * np.maximum(header["lebysum"], 1) - summing: float = xsumming * ysumming - if summing > 1: - img0: np.array = fixwrap(img0) - dofull: bool = False - else: - dofull: bool = True - if ( - header["r2col"] - - header["r1col"] - + header["r2row"] - - header["r1row"] - - 1023 - - 1023 - != 0 - ): - img: np.array = reduce_std_size(img0, header, FULL=dofull) - print( - f'LASCO-{header["detector"]}:{header["filename"]}:{header["date-obs"]}T{header["time-obs"]}...' - ) - match detector: - case "C2": - b, header = c2_calibrate(img0, header, **kwargs) - b, header = c2_warp(b, header) - zz: np.array = np.where(img0 <= 0) - maskall: np.array = np.ones((header["naxis1"], header["naxis2"])) - maskall[zz] = 0 - maskall, _ = c2_warp(maskall, header) - b *= maskall - case "C3": - b, header = c3_calibrate(img0, header, **kwargs) - bn, header = c3_warp(b, header) - zz: np.array = np.where(img0 <= 0) - maskall: np.array = np.ones((header["naxis1"], header["naxis2"])) - maskall[zz] = 0 - maskallw, _ = c3_warp(maskall, header) - _, mask = _read_mask_full(header) - b = bn * maskallw * mask - - img, header = final_step( - target_path, filetype, b, header, xsumming, ysumming, **kwargs - ) - header["level_1"] = "True" + header["level_1"] = 1 - if target_path is not None: - save(target_path, filetype, img, header) + if target_path is not None: + save(target_path, filename.replace(".", ""), format, img, header) - return img, header + print("Done!") + return img, header + + elif isinstance(fits_files, list): + _, sample_hdr = FITS(fits_files[0]) + detector = sample_hdr["detector"].strip().upper() + out: List[Tuple[np.array, fits.Header]] = [] + + match detector: + case "C2": + vig_fn = os.path.join(DEFAULT_SAVE_DIR, "c2vig_final.fts") + vig_full = fits.getdata(vig_fn) + for filepath in fits_files: + img, header = level_1( + filepath, + target_path, + format, + **kwargs, + detector=detector, + vig_full=vig_full, + ) + out.append((img, header)) + case "C3": + vig_pre, vig_post = _read_vig_full() + mask = _read_mask_full() + ramp = _read_ramp_full() + bkg = _read_bkg_full() + args = (vig_pre, vig_post, mask, ramp, bkg) + for filepath in fits_files: + img, header = level_1( + filepath, + target_path, + format, + *args, + **kwargs, + detector=detector, + ) + out.append((img, header)) + + return out # done @@ -319,45 +387,38 @@ def _read_bkg_full(): # done -def _read_ramp_full(header: fits.Header) -> Tuple[fits.Header, np.array]: +def _read_ramp_full() -> np.array: ramp_path = os.path.join(DEFAULT_SAVE_DIR, "C3ramp.fts") ramp = fits.getdata(ramp_path) - header.add_history("C3ramp.fts 1999/03/18") - return header, ramp + return ramp # done -def _read_mask_full(header: fits.Header) -> Tuple[fits.Header, np.array]: +def _read_mask_full() -> np.array: msk_fn = os.path.join(DEFAULT_SAVE_DIR, "c3_cl_mask_lvl1.fts") mask = fits.getdata(msk_fn) - header.add_history("c3_cl_mask_lvl1.fts 2005/08/08") - return header, mask + return mask # done -def _read_vig_full( - date: Union[int, float], header: fits.Header -) -> Tuple[fits.Header, np.array]: - if date < 51000: - vig_path = os.path.join(DEFAULT_SAVE_DIR, "c3vig_preint_final.fts") - header.add_history("c3vig_preint_final.fts") - else: - vig_path = os.path.join(DEFAULT_SAVE_DIR, "c3vig_postint_final.fts") - header.add_history("c3vig_postint_final.fts") - vig = fits.getdata(vig_path) - return header, vig +def _read_vig_full() -> Tuple[np.array, np.array]: + vig_pre = os.path.join(DEFAULT_SAVE_DIR, "c3vig_preint_final.fts") + vig_post = os.path.join(DEFAULT_SAVE_DIR, "c3vig_postint_final.fts") + vig_pre = fits.getdata(vig_pre) + vig_post = fits.getdata(vig_post) + return vig_pre, vig_post # done -def c3_calibrate(img0: np.array, header: fits.Header, **kwargs): +def c3_calibrate(img0: np.array, header: fits.Header, *args, **kwargs): assert header["detector"] == "C3", "Not valid C3 fits file" - if check_05: + if check_05(header): pass else: print("This file is already a Level 1 product.") - return img, header + return img0, header # returns the date mjd = header["mid_date"] @@ -377,10 +438,23 @@ def c3_calibrate(img0: np.array, header: fits.Header, **kwargs): header.add_history("No calibration factor: 1") # Get mask, ramp, bkg(fuzzy) and vignetting function - header, vig = _read_vig_full(mjd, header) - header, mask = _read_mask_full(header) - header, ramp = _read_ramp_full(header) - bkg = _read_bkg_full() + if args: + vig_pre, vig_post, mask, ramp, bkg = args + else: + vig_pre, vig_post = _read_vig_full() + mask = _read_mask_full() + ramp = _read_ramp_full() + bkg = _read_bkg_full() + + header.add_history("C3ramp.fts 1999/03/18") + header.add_history("c3_cl_mask_lvl1.fts 2005/08/08") + + if mjd < 51000: + vig = vig_pre + header.add_history("c3vig_preint_final.fts") + else: + vig = vig_post + header.add_history("c3vig_postint_final.fts") if not "NO_VIG" in kwargs: pass @@ -619,11 +693,14 @@ def c2_calibrate( ) -> Tuple[np.array, fits.Header]: assert header["detector"] == "C2", "This is not a C2 valid fits file" - if check_05: + if check_05(header): pass else: print("This file is already a Level 1 product.") - return img, header + return img0, header + + if "vig_full" in kwargs: + vig_full = kwargs["vig_full"] # Get exposure factor and dark current offset header, expfac, bias = get_exp_factor(header) # change for python imp @@ -639,20 +716,21 @@ def c2_calibrate( calfac = 1.0 # Read vignetting function and mask - vig_fn = os.path.join(DEFAULT_SAVE_DIR, "c2vig_final.fts") - vig_full = fits.getdata(vig_fn) + if vig_full is not None: + pass + else: + vig_fn = os.path.join(DEFAULT_SAVE_DIR, "c2vig_final.fts") + vig_full = fits.getdata(vig_fn) if not "NO_VIG" in kwargs: # Apply mask to vignetting correction vig_full[vig_full < 0.0] = 0.0 vig_full[vig_full > 100.0] = 100.0 + header.add_history(f"c2vig_final.fts") else: vig_full = np.ones_like(vig_full) - header.add_history(f"c2vig_final.fts") - vig_full = correct_var(header, vig_full)[0] - img = c2_calibration_forward(img0, header, calfac, vig_full) header.add_history( @@ -801,7 +879,10 @@ def __calc_cme_mass( if nb > 0: b[wb] = 1 d = header["date-obs"] - yymmdd = d.replace("/", "").strip()[2:] + if check_05(header): + yymmdd = d.replace("/", "").strip()[2:] + else: + yymmdd = d.split("T")[0].replace("/", "").strip()[2:] tel = header["telescop"].strip().upper() solar_radius = ( solar_ephem(yymmdd, True) if tel == "SOHO" else solar_ephem(yymmdd, False) @@ -824,7 +905,13 @@ def __calc_cme_mass( return mass if "ALL" in kwargs else totmass - def mass(self, bn: str, fn: str, **kwargs) -> np.array: + def mass( + self, + bn: Union[str, Tuple[np.array, fits.Header]], + fn: Union[str, List[str]], + *args, + **kwargs, + ) -> Union[np.array, List[np.array]]: """ # CME().mass Computes the mass for the Coronal Mass ejection in fn (filepath) given @@ -832,69 +919,103 @@ def mass(self, bn: str, fn: str, **kwargs) -> np.array: bn (str): filepath for the base image. fn (str): filepath for the target image. """ - b, hb = FITS(bn) - a, ha = FITS(fn) - assert ( - hb["detector"].strip() == ha["detector"].strip() - ), "Base image and output final image should belong to the same detector" - nx = ha["naxis1"] - ny = ha["naxis2"] - - match ha["detector"].strip().upper(): - case "C3": - cala, ha = c3_calibrate(a, ha) - calb, hb = c3_calibrate(b, hb) - case "C2": - cala, ha = c2_calibrate(a, ha) - calb, hb = c2_calibrate(b, hb) - sumxb = np.maximum(hb["sumcol"], 1) * hb["lebxsum"] - sumyb = np.maximum(hb["sumrow"], 1) * hb["lebysum"] - sumxa = np.maximum(ha["sumcol"], 1) * ha["lebxsum"] - sumya = np.maximum(ha["sumrow"], 1) * ha["lebysum"] - - if (sumxb != sumxa) or (sumyb != sumya): - calb = rebin(calb, hb["naxis1"] / sumxb, hb["naxis2"] / sumyb) - hb["lebxsum"] = sumxa / np.maximum(hb["sumcol"], 1) - hb["lebysum"] = sumya / np.maximum(hb["sumrow"], 1) - sb = b.shape - hb["naxis1"] = sb[0] - hb["naxis2"] = sb[1] - if nx != hb["naxis1"] or ny != hb["naxis2"]: - r1col = np.maximum(ha.r1col, hb.r1col) - r2col = np.minimum(ha.r2col, hb.r1col) - r1row = np.maximum(ha.r1row, hb.r1row) - r2row = np.minimum(ha.r2row, hb.r1row) - xa1 = (ha.r1col - r1col) / sumxa - xa2 = (ha.r2col - r2col) / sumxa - ya1 = (ha.r1row - r1row) / sumya - ya2 = (ha.r2row - r2row) / sumya - - cala = cala[ya1 : ya2 + 1, xa1 : xa2 + 1] - calb = calb[ya1 : ya2 + 1, xa1 : xa2 + 1] - - ha.r1col = r1col - hb.r1col = r1col - ha.r1row = r1row - hb.r1row = r1row - ha.r2col = r1col - hb.r2col = r2col - ha.r2row = r2row - hb.r2row = r2row - ha.naxis1 = xa2 - xa1 + 1 - hb.naxis1 = ha.naxis1 - ha.naxis2 = ya2 - ya1 + 1 - hb.naxis2 = ha.naxis2 - - dif = cala - calb - - mass = self.__calc_cme_mass(dif, ha, None, ALL=True) - dte = ha["date-obs"] + " " + ha["time-obs"].strip() - - if "trg_path" in kwargs: - with open(kwargs["trg_path"], "a") as file: - file.write(f"{dte},{mass}\n") - - return mass + if isinstance(bn, tuple): + b, hb = bn + else: + b, hb = FITS(bn) + + if isinstance(fn, str): + a, ha = FITS(fn) + assert ( + hb["detector"].strip() == ha["detector"].strip() + ), "Base image and output final image should belong to the same detector" + nx: int = ha["naxis1"] + ny: int = ha["naxis2"] + + match ha["detector"].strip().upper(): + case "C3": + cala, ha = c3_calibrate(a, ha, *args, **kwargs) + calb, hb = c3_calibrate(b, hb, *args, **kwargs) + case "C2": + cala, ha = c2_calibrate(a, ha, **kwargs) + calb, hb = c2_calibrate(b, hb, **kwargs) + sumxb = int(np.maximum(hb["sumcol"], 1)) * hb["lebxsum"] + sumyb = int(np.maximum(hb["sumrow"], 1)) * hb["lebysum"] + sumxa = int(np.maximum(ha["sumcol"], 1)) * ha["lebxsum"] + sumya = int(np.maximum(ha["sumrow"], 1)) * ha["lebysum"] + + if (sumxb != sumxa) or (sumyb != sumya): + calb = rebin(calb, hb["naxis1"] / sumxb, hb["naxis2"] / sumyb) + hb["lebxsum"] = sumxa // int(np.maximum(hb["sumcol"], 1)) + hb["lebysum"] = sumya // int(np.maximum(hb["sumrow"], 1)) + sb = b.shape + hb["naxis1"] = sb[0] + hb["naxis2"] = sb[1] + if nx != hb["naxis1"] or ny != hb["naxis2"]: + r1col: int = int(np.maximum(ha["r1col"], hb["r1col"])) + r2col: int = int(np.minimum(ha["r2col"], hb["r1col"])) + r1row: int = int(np.maximum(ha["r1row"], hb["r1row"])) + r2row: int = int(np.minimum(ha["r2row"], hb["r1row"])) + xa1: int = (ha["r1col"] - r1col) // sumxa + xa2: int = (ha["r2col"] - r2col) // sumxa + ya1: int = (ha["r1row"] - r1row) // sumya + ya2: int = (ha["r2row"] - r2row) // sumya + + cala = cala[int(ya1) : int(ya2 + 1), int(xa1) : int(xa2 + 1)] + calb = calb[int(ya1) : int(ya2 + 1), int(xa1) : int(xa2 + 1)] + + ha["r1col"] = r1col + hb["r1col"] = r1col + ha["r1row"] = r1row + hb["r1row"] = r1row + ha["r2col"] = r1col + hb["r2col"] = r2col + ha["r2row"] = r2row + hb["r2row"] = r2row + ha["naxis1"] = xa2 - xa1 + 1 + hb["naxis1"] = ha["naxis1"] + ha["naxis2"] = ya2 - ya1 + 1 + hb["naxis2"] = ha["naxis2"] + + dif = cala - calb + + mass = self.__calc_cme_mass(dif, ha, None, ALL=True) + + if "target_path" in kwargs and "format" in kwargs: + ha.add_history( + f'corkit/lasco.py CME (object) analysis module, returned the mass given the base {hb["fileorig"]}.' + ) + ha["CME_MASS"] = f"{np.sum(mass):.3e}" + save(kwargs["target_path"], kwargs["format"], mass, ha) + + return mass + elif isinstance(fn, list): + _, sample_hdr = FITS(fn[0]) + detector = sample_hdr["detector"].strip().upper() + out: List[np.array] = [] + + match detector: + case "C2": + vig_fn = os.path.join(DEFAULT_SAVE_DIR, "c2vig_final.fts") + vig_full = fits.getdata(vig_fn) + for filepath in fn: + mass = self.mass( + bn, filepath, **kwargs, detector=detector, vig_full=vig_full + ) + out.append(mass) + case "C3": + vig_pre, vig_post = _read_vig_full() + mask = _read_mask_full() + ramp = _read_ramp_full() + bkg = _read_bkg_full() + args = (vig_pre, vig_post, mask, ramp, bkg) + for filepath in fn: + mass = self.mass( + bn, filepath, *args, **kwargs, detector=detector + ) + out.append(mass) + + return out def plot(self, mass: np.array, fn_header: fits.Header) -> None: """ diff --git a/corkit/utils.py b/corkit/utils.py index 802a5b9..55bced0 100644 --- a/corkit/utils.py +++ b/corkit/utils.py @@ -41,8 +41,10 @@ def datetime_interval( def save_to_fits(img: np.array, header: fits.Header, filepath: str): primary_hdu = fits.PrimaryHDU(img) hdul = fits.HDUList([primary_hdu]) + # Solves writing problems + header["OBT_TIME"] = f'{header["OBT_TIME"]:.2e}' + header["DATASAT"] = f'{header["DATASAT"]:.2e}' hdul[0].header = header - # Writing to filepath hdul.writeto(filepath, overwrite=True) @@ -57,14 +59,20 @@ def save_to_jp2(img, filepath): # done -def save(filepath, filetype, img, header=None): - if filetype == "fits": +def save( + filepath: str, + filename: str, + filetype: str, + img: np.array, + header: fits.Header = None, +) -> None: + if filetype == "fits" or filetype == "fts": assert header is not None, "You missed the header for fits file" - save_to_fits(img, header, filepath) + save_to_fits(img, header, os.path.join(filepath, filename + ".fits")) elif filetype == "jp2": - save_to_jp2(img, filepath) + save_to_jp2(img, os.path.join(filepath, filename + ".jp2")) elif filetype == "png": - save_to_png(img, filepath) + save_to_png(img, os.path.join(filepath, filename + ".png")) # done @@ -225,12 +233,12 @@ def c3_warp(img, header): sumx = header["lebxsum"] * np.maximum(header["sumcol"], 1) sumy = header["lebysum"] * np.maximum(header["sumrow"], 1) if sumx > 1: - x /= sumx + x = x.astype(np.float32) / float(sumx) xc /= sumx x1 /= sumx x2 /= sumx if sumy > 1: - y /= sumy + y = y.astype(np.float32) / float(sumy) yc /= sumy y1 /= sumy y2 /= sumy @@ -249,7 +257,7 @@ def c3_warp(img, header): # Distort the image by shifting locations (x,y) to (x0,y0) and return it: img = warp_tri(x, y, x0, y0, img) header.add_history(f"corkit/utils.py warp_tri: (function) {__version__} 12/04/24") - return img[y1 : y2 + 1, x1 : x2 + 1], header + return img[int(y1) : int(y2 + 1), int(x1) : int(x2 + 1)], header # done @@ -454,12 +462,13 @@ def reduce_std_size( FULL: bool = False, SAVE_HDR: bool = False, ): - sumrow = np.maximum(hdr["sumrow"], 1) - sumcol = np.maximum(hdr["sumcol"], 1) - lebxsum = np.maximum(hdr["lebxsum"], 1) - lebysum = np.maximum(hdr["lebysum"], 1) - naxis1 = hdr["naxis1"] - naxis2 = hdr["naxis2"] + sumrow: int = int(np.maximum(hdr["sumrow"], 1)) + sumcol: int = int(np.maximum(hdr["sumcol"], 1)) + lebxsum: int = int(np.maximum(hdr["lebxsum"], 1)) + lebysum: int = int(np.maximum(hdr["lebysum"], 1)) + naxis1: int = int(hdr["naxis1"]) + naxis2: int = int(hdr["naxis2"]) + tel = hdr["telescop"].strip() if naxis1 <= 0 or naxis2 <= 0: @@ -475,10 +484,7 @@ def reduce_std_size( r1row = hdr["P1ROW"] if BIAS is not None and not NOCAL: - if BIAS == 1: - abias = offset_bias(hdr) - else: - abias = BIAS + abias = offset_bias(hdr) if BIAS == 1 else BIAS if sumcol > 1: img = (img0 - abias) / (sumcol * sumrow) @@ -493,8 +499,8 @@ def reduce_std_size( else: img = img0 - nxfac = 2 ** (sumcol + lebxsum - 2) - nyfac = 2 ** (sumrow + lebysum - 2) + nxfac: int = int(2 ** (sumcol + lebxsum - 2)) + nyfac: int = int(2 ** (sumrow + lebysum - 2)) if tel == "SOHO": if ( @@ -508,11 +514,16 @@ def reduce_std_size( nx = nxfac * naxis1 ny = nyfac * naxis2 + if hdr["r2col"] - r1col + 1 != naxis1 * lebxsum: + r1col -= 32 + if hdr["r2row"] - r1row + 1 != naxis2 * lebysum: + r1row -= 32 + if (nx < 1024 or ny < 1024) and tel == "SOHO": sz = img.shape - full_img = np.zeros((1024 / nyfac, 1024 / nxfac), dtype=img.dtype) - nx = np.minimum(sz[1], 1024) - ny = np.minimum(sz[0], 1024) + full_img = np.zeros((1024 // nyfac, 1024 // nxfac), dtype=img.dtype) + nx: int = int(np.minimum(sz[1], 1024)) + ny: int = int(np.minimum(sz[0], 1024)) naxis1 = 1024 if nxfac < 2 else 512 naxis2 = 1024 if nyfac < 2 else 512 @@ -520,20 +531,22 @@ def reduce_std_size( if r1col < 20: startrow = (offrow - 1) / nyfac - startrow = np.minimum(startrow, 1024 - ny) - full_img[startrow, (r1col - 1) // nxfac] = img[0 : ny - 1, 0 : nx - 1] + startrow: int = int(np.minimum(startrow, 1024 - ny)) + full_img[startrow, (r1col - 1) // nxfac] = img[0:ny, 0:nx] else: - startcol = ( + startcol: int = int( 0 if ((r1col - 20) / nxfac + (nx - 1) > 1024 / nxfac) else (r1col - 20) / nxfac ) - startrow = ( + startrow: int = int( 0 if ((offrow - 1) / nyfac + (ny - 1) > 1024 / nyfac) else (offrow - 1) / nyfac ) - full_img[startrow, startcol] = img[0 : nx - 1, 0 : ny - 1] + full_img[startrow : startrow + ny, startcol : startcol + nx] = img[ + 0:ny, 0:nx + ] hdr["crpix1"] += (r1col - 20) / nxfac hdr["crpix2"] += (offrow - 1) / nyfac else: @@ -1558,8 +1571,7 @@ def rot( def check_05(hdr): try: - hdr["level_1"] - return False + return hdr["level_1"] != 1 except KeyError: return True diff --git a/docs/RELEASES.md b/docs/RELEASES.md index c034df1..94eb08c 100644 --- a/docs/RELEASES.md +++ b/docs/RELEASES.md @@ -65,6 +65,27 @@ ### Fixed - Debugged fits history record. +### Contributors +- [Jorge Enciso](https://github.com/Jorgedavyd) +--- +## 1.0.15 - 2024-04-22 + +### Added +- test.py + - test automation for different fits files dates and formats. +### Changed +- corkit/lasco.py level_1 (function) + - Optimized IN/OUT operations for list of fits_files. +- corkit/lasco.py CME (object) + - Optimized IN/OUT operations for list of fits_files. +### Fixed +- corkit/utils.py reduce_std_size (function) + - Wrong indexing approach fixed. +- corkit/lasco.py level_1 (function) + - Wrong variable sentencing for img/img0, caused trouble for rebin. + - Debugged check 05 implementation on level 1 + - Truncated non-ASCII characters to write files. + ### Contributors - [Jorge Enciso](https://github.com/Jorgedavyd) diff --git a/test.py b/test.py new file mode 100644 index 0000000..bfc3a43 --- /dev/null +++ b/test.py @@ -0,0 +1,58 @@ +import os +import shutil +from datetime import datetime +import asyncio + +scrap_date_list = [ + (datetime(1998, 5, 6), datetime(1998, 5, 7)), # Solar Storm of May 1998 +] + + +tools = ["c2", "c3"] +## Downloader +from corkit.lasco import downloader + +os.makedirs("test", exist_ok=True) + + +async def test_downloader(tool: str) -> None: + down = downloader(tool, "./test") + await down(scrap_date_list) + + +for tool in tools: + asyncio.run(test_downloader(tool)) + +## Checking +from corkit.lasco import level_1 + +path = lambda name: f"test/{name}" + +for name in tools: + file_list = [ + os.path.join(path(name), filename) + for filename in sorted(os.listdir(path(name))) + ] + level_1(file_list, path(name)) + +print("Level 1 calibration test done!") + +from corkit.lasco import CME + +cme = CME() + +bn = lambda name: os.path.join(f"test/{name}", sorted(os.listdir(f"./test/{name}"))[0]) + +for name in tools: + cme.mass( + bn(name), + [ + os.path.join(path(name), filename) + for filename in sorted(os.listdir(path(name))) + ], + ALL=True, + ) + +print("CME analysis test done!") + +shutil.rmtree("./test")