diff --git a/omc3/__init__.py b/omc3/__init__.py index dca45f5d..aaef28c5 100644 --- a/omc3/__init__.py +++ b/omc3/__init__.py @@ -11,7 +11,7 @@ __title__ = "omc3" __description__ = "An accelerator physics tools package for the OMC team at CERN." __url__ = "https://github.com/pylhc/omc3" -__version__ = "0.16.0" +__version__ = "0.16.2" __author__ = "pylhc" __author_email__ = "pylhc@github.com" __license__ = "MIT" diff --git a/omc3/hole_in_one.py b/omc3/hole_in_one.py index c52712e6..56d7080f 100644 --- a/omc3/hole_in_one.py +++ b/omc3/hole_in_one.py @@ -610,6 +610,8 @@ def optics_params(): help="Calculate second order dispersion") params.add_parameter(name="chromatic_beating", action="store_true", help="Calculate chromatic beatings: W, PHI and coupling") + params.add_parameter(name="analyse_dpp", type=iotools.OptionalFloat, default=OPTICS_DEFAULTS["analyse_dpp"], + help="Filter files to analyse by this value (phase advance, rdt, crdt).") return params @@ -637,6 +639,7 @@ def optics_params(): "range_of_bpms": 11, "compensation": "model", "rdt_magnet_order": 4, + "analyse_dpp": 0, } diff --git a/omc3/optics_measurements/crdt.py b/omc3/optics_measurements/crdt.py index 38f3aaba..75b1fd22 100644 --- a/omc3/optics_measurements/crdt.py +++ b/omc3/optics_measurements/crdt.py @@ -8,24 +8,38 @@ https://arxiv.org/pdf/1402.1461.pdf. """ from __future__ import annotations + from collections.abc import Sequence from pathlib import Path +from typing import TYPE_CHECKING + import numpy as np import pandas as pd -import tfs import scipy.odr -from omc3.optics_measurements.constants import ERR, EXT, AMPLITUDE, MDL, PHASE, REAL, IMAG -from omc3.utils import iotools, logging_tools -from omc3.utils.stats import circular_nanmean, circular_nanerror -from omc3.definitions.constants import PLANES, PI2 -from omc3.harpy.constants import COL_AMP, COL_MU, COL_PHASE, COL_ERR -from omc3.optics_measurements.rdt import get_line_sign_and_suffix +import tfs -from typing import TYPE_CHECKING +from omc3.definitions.constants import PI2, PLANES +from omc3.harpy.constants import COL_AMP, COL_ERR, COL_MU, COL_PHASE +from omc3.optics_measurements.constants import ( + AMPLITUDE, + ERR, + EXT, + IMAG, + MDL, + PHASE, + REAL, +) +from omc3.optics_measurements.data_models import ( + InputFiles, + check_and_warn_about_offmomentum_data, + filter_for_dpp, +) +from omc3.optics_measurements.rdt import get_line_sign_and_suffix +from omc3.utils import iotools, logging_tools +from omc3.utils.stats import circular_nanerror, circular_nanmean if TYPE_CHECKING: - from generic_parser import DotDict - from omc3.optics_measurements.data_models import InputFiles + from generic_parser import DotDict LOGGER = logging_tools.get_logger(__name__) @@ -57,11 +71,12 @@ def calculate(measure_input: DotDict, input_files: InputFiles, invariants, header): """ Calculate the CRDT values. """ LOGGER.info("Start of CRDT analysis") - - # Todo: only on-momentum required? If not, remove this or set `dpp_value=None`, see https://github.com/pylhc/omc3/issues/456 - # For now: use only the actions belonging to the current dpp value! - dpp_value = 0 - invariants = {plane: inv[input_files.dpp_frames_indices(plane, dpp_value)] for plane, inv in invariants.items()} + dpp_value = measure_input.analyse_dpp + if dpp_value is None: + for plane in PLANES: + check_and_warn_about_offmomentum_data(input_files, plane, id_="CRDT calculation") + else: + invariants = filter_for_dpp(invariants, input_files, dpp_value) assert len(input_files['X']) == len(input_files['Y']) bpm_names = input_files.bpms(dpp_value=0) diff --git a/omc3/optics_measurements/data_models.py b/omc3/optics_measurements/data_models.py index 4922d6f6..9a697e74 100644 --- a/omc3/optics_measurements/data_models.py +++ b/omc3/optics_measurements/data_models.py @@ -24,8 +24,6 @@ LOGGER = logging_tools.get_logger(__name__) -DPP_TOLERANCE = 1e-6 - class InputFiles(dict): """ Stores the input files, provides methods to gather quantity specific data @@ -96,7 +94,7 @@ def dpp_frames_indices(self, plane: str, dpp_value: float | None): if dpp_value is None: return list(range(len(self[plane]))) - return np.argwhere(np.abs(self.dpps(plane) - dpp_value) < DPP_TOLERANCE).T[0] + return np.argwhere(np.abs(self.dpps(plane) - dpp_value) < dpp.DPP_TOLERANCE).T[0] def _all_frames(self, plane: str): return self[plane] @@ -219,3 +217,40 @@ def get_data(frame, column) -> np.ndarray: """ columns = InputFiles.get_columns(frame, column) return frame.loc[:, columns].to_numpy(dtype=np.float64) + + +# DPP Filtering related functions ------------------------------------------------------------------ + +def check_and_warn_about_offmomentum_data(input_files: InputFiles, plane: str, id_: str = None): + """ A helper function to check if off-momentum data is present in the input files, + but no dpp-value is given by the user. + + See https://github.com/pylhc/omc3/issues/456 . + """ + on_momentum_files = input_files.dpp_frames_indices(plane, dpp_value=0) + if len(on_momentum_files) == len(input_files[plane]): + return # no off-momentum data, nothing to warn + + msg = ( + "Off-momentum files for analysis found!\n" + "They will be included" + ) + + if id_ is not None: + msg += f" in the {id_}, which " + + msg += ( + ", which can make the results more inaccurate.\n" + "To avoid, specify `analyse_dpp` or run the analysis only on on-momentum files" + ) + + if id_ is not None: + msg += f" or possibly deactivate the {id_}" + + msg += "." + LOGGER.warning(msg) + + +def filter_for_dpp(to_filter: dict[str, Sequence], input_files: InputFiles, dpp_value: float): + """ Filter the given data for the given dpp-value. """ + return {plane: values[input_files.dpp_frames_indices(plane, dpp_value)] for plane, values in to_filter.items()} \ No newline at end of file diff --git a/omc3/optics_measurements/dpp.py b/omc3/optics_measurements/dpp.py index 51f6d912..76951410 100644 --- a/omc3/optics_measurements/dpp.py +++ b/omc3/optics_measurements/dpp.py @@ -5,25 +5,35 @@ This module contains deltap over p calculations related functionality of ``optics_measurements``. It provides functions to computes and arrange dp over p. """ +from __future__ import annotations + +from typing import TYPE_CHECKING +from collections.abc import Sequence import logging import numpy as np import pandas as pd -DPP_TOLERANCE = 1e-4 -AMP_DPP_TOLERANCE = 1e-5 +if TYPE_CHECKING: + import tfs + from generic_parser import DotDict + from omc3.optics_measurements.data_models import InputFiles + +DPP_BIN_TOLERANCE: float = 1e-4 +DPP_TOLERANCE: float = 1e-5 # not sure if these should be different! (jdilly) + LOGGER = logging.getLogger(__name__) -def arrange_dpps(dpps): +def arrange_dpps(dpps: Sequence[float], tolerance: float = DPP_BIN_TOLERANCE): """ Grouping of dpp-values and averaging them in the bins, also zeroes the bin closest to zero. """ closest_to_zero = np.argmin(np.abs(dpps)) - ranges = _compute_ranges(dpps) + ranges = _compute_ranges(dpps, tolerance) zero_offset = np.mean(_values_in_range(_find_range_with_element(ranges, closest_to_zero), dpps)) LOGGER.debug(f"dp/p closest to zero is {dpps[closest_to_zero]}") - if np.abs(zero_offset) > DPP_TOLERANCE: + if np.abs(zero_offset) > tolerance: LOGGER.warning(f"Analysed files have large momentum deviation {zero_offset}. " f"Optics parameters might be wrong.") LOGGER.debug(f"Detected dpp differences, aranging as: {ranges}, zero offset: {zero_offset}.") @@ -42,32 +52,42 @@ def _find_range_with_element(ranges, element): return [dpp_range for dpp_range in ranges if element in dpp_range][0] -def _compute_ranges(dpps): +def _compute_ranges(dpps: Sequence[float], tolerance: float) -> list[list[int]]: + """ Groups the indices of dpps in bins of DPP_TOLERANCE. + The function first sorts the indices by their dpp value + adds a new group whenever the dpp of the next index is larger than the first value in the group. + + Works for now, but we could improve by using the mean dpp of the group to check against. + Only neccessary if we have some weird outliers. (jdilly) + """ ordered_ids = np.argsort(dpps) ranges = [[ordered_ids[0]]] if len(ordered_ids) == 1: return ranges for idx in ordered_ids[1:]: - if abs(dpps[ranges[-1][0]] - dpps[idx]) < DPP_TOLERANCE: + if abs(dpps[ranges[-1][0]] - dpps[idx]) < tolerance: ranges[-1].append(idx) else: ranges.append([idx]) return ranges -def append_amp_dpp(list_of_tfs, dpp_values): +def append_amp_dpp(list_of_tfs: Sequence[tfs.TfsFile], dpp_values: Sequence[float]): + """ Add the dpp values to the DPP-header of the tfs files, if larger than the DPP-tolerance, otherwise set to zero. + This is intended to the DPP value for on-momentum files to zero. """ for i, dpp in enumerate(dpp_values): - list_of_tfs[i].headers["DPPAMP"] = dpp if (not np.isnan(dpp) and np.abs(dpp) > AMP_DPP_TOLERANCE) else 0.0 + list_of_tfs[i].headers["DPPAMP"] = dpp if (not np.isnan(dpp) and np.abs(dpp) > DPP_TOLERANCE) else 0.0 return list_of_tfs -def append_dpp(list_of_tfs, dpp_values): +def append_dpp(list_of_tfs: Sequence[tfs.TfsFile], dpp_values: Sequence[float]): + """ Add the dpp values to the DPP-header of the tfs files. """ for i, dpp in enumerate(dpp_values): list_of_tfs[i].headers["DPP"] = dpp return list_of_tfs -def calculate_dpoverp(input_files, meas_input): +def calculate_dpoverp(input_files: InputFiles, meas_input: DotDict): df_orbit = pd.DataFrame(meas_input.accelerator.model).loc[:, ['S', 'DX']] df_orbit = pd.merge(df_orbit, input_files.joined_frame('X', ['CO', 'CORMS']), how='inner', left_index=True, right_index=True) @@ -85,7 +105,7 @@ def calculate_dpoverp(input_files, meas_input): return numer / denom -def calculate_amp_dpoverp(input_files, meas_input): +def calculate_amp_dpoverp(input_files: InputFiles, meas_input: DotDict): df_orbit = pd.DataFrame(meas_input.accelerator.model).loc[:, ['S', 'DX']] df_orbit = pd.merge(df_orbit, input_files.joined_frame('X', ['AMPX', 'AMPZ']), how='inner', left_index=True, right_index=True) @@ -105,3 +125,5 @@ def calculate_amp_dpoverp(input_files, meas_input): else: numer = np.sum(dispersions[:, None] * amps[mask_zeros, :], axis=0) return numer / denom + + diff --git a/omc3/optics_measurements/phase.py b/omc3/optics_measurements/phase.py index 53c94998..b2195302 100644 --- a/omc3/optics_measurements/phase.py +++ b/omc3/optics_measurements/phase.py @@ -6,31 +6,49 @@ It provides functions to compute betatron phase advances and structures to store them. """ from __future__ import annotations + from pathlib import Path +from typing import TYPE_CHECKING, Any import numpy as np import pandas as pd import tfs -from omc3.optics_measurements.constants import (DELTA, ERR, EXT, MDL, PHASE_NAME, SPECIAL_PHASE_NAME, - TOTAL_PHASE_NAME) +from omc3.optics_measurements.constants import ( + DELTA, + ERR, + EXT, + MDL, + PHASE_NAME, + SPECIAL_PHASE_NAME, + TOTAL_PHASE_NAME, +) +from omc3.optics_measurements.data_models import ( + InputFiles, + check_and_warn_about_offmomentum_data, +) from omc3.optics_measurements.toolbox import ang_sum, df_ang_diff, df_diff from omc3.utils import logging_tools, stats -from typing import TYPE_CHECKING - if TYPE_CHECKING: - from generic_parser import DotDict - from omc3.optics_measurements.data_models import InputFiles - from omc3.model.accelerators.accelerator import Accelerator + from collections.abc import Sequence + + from generic_parser import DotDict from numpy.typing import ArrayLike + from omc3.model.accelerators.accelerator import Accelerator + from omc3.optics_measurements.tune import TuneDict + LOGGER = logging_tools.get_logger(__name__) def calculate( - meas_input: DotDict, input_files: InputFiles, tunes, plane, no_errors=False + meas_input: DotDict, + input_files: InputFiles, + tunes: TuneDict, + plane: str, + no_errors: bool = False, ) -> dict[str, tuple[dict[str, tfs.TfsDataFrame], tfs.TfsDataFrame]]: """ Calculate phases for 'free' and 'uncompensated' cases from the measurement files, and return a @@ -48,32 +66,30 @@ def calculate( is a dictionary with the measured phase advances for 'free' and 'uncompensated' cases, as well as the location of the output ``TfsDataFrames`` for the phases. """ + phase_advances, dfs = _calculate_with_compensation( + meas_input, + input_files, + tunes, + plane, + meas_input.accelerator.model, + meas_input.compensation, + no_errors, + ) + if meas_input.compensation == "none": - phase_advances, dfs = _calculate_with_compensation(meas_input, - input_files, - tunes, - plane, - meas_input.accelerator.model, - 'none', - no_errors) uncompensated_phase_advances = phase_advances else: - phase_advances, free_dfs = _calculate_with_compensation(meas_input, - input_files, - tunes, - plane, - meas_input.accelerator.model, - meas_input.compensation, - no_errors) LOGGER.info("-- Run uncompensated") - uncompensated_phase_advances, drv_dfs = _calculate_with_compensation(meas_input, - input_files, - tunes, - plane, - meas_input.accelerator.model_driven, - 'none', - no_errors) - dfs = free_dfs + drv_dfs + uncompensated_phase_advances, drv_dfs = _calculate_with_compensation( + meas_input, + input_files, + tunes, + plane, + meas_input.accelerator.model_driven, + "none", + no_errors, + ) + dfs = dfs + drv_dfs if len(phase_advances["MEAS"].index) < 3: @@ -90,7 +106,15 @@ def calculate( -def _calculate_with_compensation(meas_input, input_files, tunes, plane, model_df, compensation='none', no_errors=False): +def _calculate_with_compensation( + meas_input: DotDict, + input_files: InputFiles, + tunes: TuneDict, + plane: str, + model_df: pd.DataFrame, + compensation: str = "none", + no_errors: bool = False, +): """ Calculates phase advances. @@ -129,7 +153,11 @@ def _calculate_with_compensation(meas_input, input_files, tunes, plane, model_df df = model_df.loc[:, ["S", f"MU{plane}"]] how = 'outer' if meas_input.union else 'inner' - dpp_value = meas_input.dpp if "dpp" in meas_input.keys() else 0 + + dpp_value = meas_input.analyse_dpp + if dpp_value is None: + check_and_warn_about_offmomentum_data(input_files, plane, id_="Phase calculations") + df = pd.merge(df, input_files.joined_frame(plane, [f"MU{plane}", f"{ERR}MU{plane}"], dpp_value=dpp_value, how=how), how='inner', left_index=True, right_index=True) @@ -170,7 +198,7 @@ def _calculate_with_compensation(meas_input, input_files, tunes, plane, model_df _create_output_df(phase_advances, df, plane, tot=True)] -def _compensate_by_equation(phases_meas: ArrayLike, plane, tunes): +def _compensate_by_equation(phases_meas: ArrayLike, plane: str, tunes: TuneDict) -> ArrayLike: driven_tune, free_tune, ac2bpmac = tunes[plane]["Q"], tunes[plane]["QF"], tunes[plane]["ac2bpm"] k_bpmac = ac2bpmac[2] phase_corr = ac2bpmac[1] - phases_meas[k_bpmac] + (0.5 * driven_tune) @@ -183,7 +211,7 @@ def _compensate_by_equation(phases_meas: ArrayLike, plane, tunes): return phases_meas -def _compensate_by_model(input_files, meas_input, df, plane): +def _compensate_by_model(input_files: InputFiles, meas_input: DotDict, df: pd.DataFrame, plane: str): df = pd.merge(df, pd.DataFrame(meas_input.accelerator.model_driven.loc[:, [f"MU{plane}"]]), how='inner', left_index=True, right_index=True, suffixes=("", "comp")) phase_compensation = df_diff(df, f"MU{plane}", f"MU{plane}comp") @@ -192,7 +220,7 @@ def _compensate_by_model(input_files, meas_input, df, plane): return df -def write(dfs, headers, output, plane): +def write(dfs: Sequence[pd.DataFrame], headers: Sequence[dict[str, Any]], output: str | Path, plane: str): LOGGER.info(f"Writing phases: {len(dfs)}") for head, df, name in zip(headers, dfs, (PHASE_NAME, TOTAL_PHASE_NAME, PHASE_NAME+"driven_", TOTAL_PHASE_NAME+"driven_")): tfs.write(Path(output) / f"{name}{plane.lower()}{EXT}", df, head) @@ -230,7 +258,7 @@ def _get_all_phase_diff(phases_a: ArrayLike, phases_b: ArrayLike = None): return (phases_a[np.newaxis, :] - phases_b[:, np.newaxis]) % 1.0 -def _get_square_data_frame(data, index): +def _get_square_data_frame(data, index) -> pd.DataFrame: return pd.DataFrame(data=data, index=index, columns=index) diff --git a/omc3/optics_measurements/rdt.py b/omc3/optics_measurements/rdt.py index 0ac8b482..1984119c 100644 --- a/omc3/optics_measurements/rdt.py +++ b/omc3/optics_measurements/rdt.py @@ -22,17 +22,19 @@ from omc3.optics_measurements.tune import TuneDict from omc3.utils import iotools, logging_tools, stats from optics_functions.rdt import get_all_to_order, jklm2str +from omc3.optics_measurements.data_models import InputFiles, check_and_warn_about_offmomentum_data, filter_for_dpp -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING if TYPE_CHECKING: from generic_parser import DotDict - from omc3.optics_measurements.data_models import InputFiles + from numpy.typing import ArrayLike NBPMS_FOR_90 = 3 LOGGER = logging_tools.get_logger(__name__) RDTTuple = tuple[int, int, int, int] +LineTuple = tuple[int, int, int] def _generate_plane_rdts(order: int) -> tuple[dict[str, list[RDTTuple]], dict[str, list[RDTTuple]]]: @@ -99,10 +101,17 @@ def calculate( meas_input = deepcopy(measure_input) meas_input["compensation"] = "none" LOGGER.info(f"Calculating RDTs up to magnet order {meas_input['rdt_magnet_order']}") + + dpp_value = meas_input.analyse_dpp + if dpp_value is None: + for plane in PLANES: + check_and_warn_about_offmomentum_data(input_files, plane, id_="RDT calculation") + else: + invariants = filter_for_dpp(invariants, input_files, dpp_value) single_plane_rdts, double_plane_rdts = _generate_plane_rdts(meas_input["rdt_magnet_order"]) for plane in PLANES: - bpm_names = input_files.bpms(plane=plane, dpp_value=0) + bpm_names = input_files.bpms(plane=plane, dpp_value=dpp_value) for_rdts = _best_90_degree_phases(meas_input, bpm_names, phases, tunes, plane) LOGGER.info(f"Average phase advance between BPM pairs: {for_rdts.loc[:,'MEAS'].mean()}") for rdt in single_plane_rdts[plane]: @@ -110,7 +119,7 @@ def calculate( df = _process_rdt(meas_input, input_files, for_rdts, invariants, plane, rdt) write(df, add_freq_to_header(header, plane, rdt), meas_input, plane, rdt) for plane in PLANES: - bpm_names = input_files.bpms(dpp_value=0) + bpm_names = input_files.bpms(dpp_value=dpp_value) for_rdts = _best_90_degree_phases(meas_input, bpm_names, phases, tunes, plane) LOGGER.info(f"Average phase advance between BPM pairs: {for_rdts.loc[:, 'MEAS'].mean()}") for rdt in double_plane_rdts[plane]: @@ -185,7 +194,7 @@ def _get_n_upper_diagonals(n, shape): return diags(np.ones((n, shape[0])), np.arange(n)+1, shape=shape).toarray() -def _determine_line(rdt: RDTTuple, plane: str): +def _determine_line(rdt: RDTTuple, plane: str) -> dict[str, tuple[int, int, int]]: j, k, l, m = rdt # noqa: E741 lines = dict(X=(1 - j + k, m - l, 0), Y=(k - j, 1 - l + m, 0)) @@ -201,10 +210,7 @@ def add_freq_to_header(header, plane, rdt): def _process_rdt(meas_input: DotDict, input_files: InputFiles, phase_data, invariants, plane, rdt: RDTTuple): - # Todo: only on-momentum required? If not, remove this or set `dpp_value=None`, see https://github.com/pylhc/omc3/issues/456 - # For now: use only the actions belonging to the current dpp value! - dpp_value = 0 - invariants = {plane: inv[input_files.dpp_frames_indices(plane, dpp_value)] for plane, inv in invariants.items()} + dpp_value = meas_input.analyse_dpp df = pd.DataFrame(phase_data) second_bpms = df.loc[:, "NAME2"].to_numpy() @@ -220,14 +226,16 @@ def _process_rdt(meas_input: DotDict, input_files: InputFiles, phase_data, invar # Multiples of tunes needs to be added to phase at second BPM if that is in second turn comp_coeffs2 = to_complex( df_all_amps.loc[second_bpms, :], - _add_tunes_if_in_second_turn(df, input_files, line, df_all_phases.loc[second_bpms, :].to_numpy()) + _add_tunes_if_in_second_turn( + df, input_files, line, df_all_phases.loc[second_bpms, :].to_numpy(), dpp_value + ) ) # Get amplitude and phase of the line from linx/liny file line_amp, line_phase, line_amp_e, line_phase_e = complex_secondary_lines( # TODO use the errors df.loc[:, "MEAS"].to_numpy()[:, np.newaxis] * meas_input.accelerator.beam_direction, df.loc[:, "ERRMEAS"].to_numpy()[:, np.newaxis], comp_coeffs1, comp_coeffs2) - rdt_phases_per_file = _calculate_rdt_phases_from_line_phases(df, input_files, line, line_phase) + rdt_phases_per_file = _calculate_rdt_phases_from_line_phases(df, input_files, line, line_phase, dpp_value) rdt_angles = stats.circular_mean(rdt_phases_per_file, period=1, axis=1) % 1 df[PHASE] = rdt_angles df[f"{ERR}{PHASE}"] = stats.circular_error(rdt_phases_per_file, period=1, axis=1) @@ -238,20 +246,20 @@ def _process_rdt(meas_input: DotDict, input_files: InputFiles, phase_data, invar return df.loc[:, ["S", "COUNT", AMPLITUDE, f"{ERR}{AMPLITUDE}", "PHASE", f"{ERR}PHASE", "REAL", "IMAG"]] -def _add_tunes_if_in_second_turn(df, input_files, line, phase2): +def _add_tunes_if_in_second_turn(df: pd.DataFrame, input_files: InputFiles, line, phase2, dpp_value): mask = df_diff(df, "S", "S2") > 0 - tunes = np.empty((2, len(input_files.dpp_frames("X", 0)))) + tunes = np.empty((2, len(input_files.dpp_frames("X", dpp_value)))) for i, plane in enumerate(PLANES): - tunes[i] = np.array([lin.headers[f"Q{i+1}"] for lin in input_files.dpp_frames(plane, 0)]) + tunes[i] = np.array([lin.headers[f"Q{i+1}"] for lin in input_files.dpp_frames(plane, dpp_value)]) phase2[mask, :] = phase2[mask, :] + line[0] * tunes[0] + line[1] * tunes[1] return phase2 -def _calculate_rdt_phases_from_line_phases(df, input_files, line, line_phase): - phases = np.zeros((2, df.index.size, len(input_files.dpp_frames("X", 0)))) +def _calculate_rdt_phases_from_line_phases(df, input_files, line, line_phase, dpp_value): + phases = np.zeros((2, df.index.size, len(input_files.dpp_frames("X", dpp_value)))) for i, plane in enumerate(PLANES): if line[i] != 0: - phases[i] = input_files.joined_frame(plane, [f"MU{plane}"], dpp_value=0).loc[df.index, :].to_numpy() % 1 + phases[i] = input_files.joined_frame(plane, [f"MU{plane}"], dpp_value=dpp_value).loc[df.index, :].to_numpy() % 1 return line_phase - line[0] * phases[0] - line[1] * phases[1] + 0.25 @@ -287,7 +295,7 @@ def get_linearized_problem(invs: dict[str, np.ndarray], plane: str, rdt: RDTTupl return 2 * l * act_x ** (j + k) * act_y ** (l + m - 2) -def get_line_sign_and_suffix(line, input_files, plane): +def get_line_sign_and_suffix(line: LineTuple, input_files: InputFiles, plane: str): suffix = f"{line[0]}{line[1]}".replace("-", "_") conj_suffix = f"{-line[0]}{-line[1]}".replace("-", "_") if f"AMP{suffix}" in input_files[plane][0].columns: @@ -301,7 +309,7 @@ def get_line_sign_and_suffix(line, input_files, plane): raise ValueError(msg) -def complex_secondary_lines(phase_adv, err_padv, sig1, sig2): +def complex_secondary_lines(phase_adv: ArrayLike[float], err_padv: ArrayLike[float], sig1: ArrayLike[complex], sig2: ArrayLike[complex]): """ Args: @@ -323,7 +331,7 @@ def complex_secondary_lines(phase_adv, err_padv, sig1, sig2): np.abs(esig), (np.angle(esig) / tp) % 1) -def to_complex(amplitudes, phases, period=1): +def to_complex(amplitudes: ArrayLike, phases: ArrayLike, period: float = 1): try: amplitudes = amplitudes.to_numpy() except AttributeError: diff --git a/omc3/optics_measurements/tune.py b/omc3/optics_measurements/tune.py index 8ce2910d..cd3df2b7 100644 --- a/omc3/optics_measurements/tune.py +++ b/omc3/optics_measurements/tune.py @@ -9,21 +9,27 @@ import pandas as pd from omc3.definitions.constants import PLANES, PLANE_TO_NUM +from omc3.optics_measurements.data_models import check_and_warn_about_offmomentum_data from omc3.utils import stats from typing import TYPE_CHECKING if TYPE_CHECKING: + from generic_parser import DotDict from omc3.optics_measurements.data_models import InputFiles -def calculate(measure_input, input_files: InputFiles): +def calculate(measure_input: DotDict, input_files: InputFiles): tune_d = TuneDict() accelerator = measure_input.accelerator for plane in PLANES: + dpp_value = measure_input.analyse_dpp + if dpp_value is None: + check_and_warn_about_offmomentum_data(input_files, plane, id_="Tune calculations") + tune_d[plane]["QM"] = accelerator.model.headers[f"Q{PLANE_TO_NUM[plane]}"] - tune_list = [df.headers[f"Q{PLANE_TO_NUM[plane]}"] for df in input_files.dpp_frames(plane, 0)] - tune_rms_list = [df.headers[f"Q{PLANE_TO_NUM[plane]}RMS"] for df in input_files.dpp_frames(plane, 0)] + tune_list = [df.headers[f"Q{PLANE_TO_NUM[plane]}"] for df in input_files.dpp_frames(plane, dpp_value)] + tune_rms_list = [df.headers[f"Q{PLANE_TO_NUM[plane]}RMS"] for df in input_files.dpp_frames(plane, dpp_value)] measured_tune = stats.weighted_mean(np.array(tune_list), errors=np.array(tune_rms_list)) tune_d[plane]["Q"], tune_d[plane]["QF"] = measured_tune, measured_tune tune_d[plane]["QFM"] = accelerator.nat_tunes[PLANE_TO_NUM[plane] - 1] @@ -32,7 +38,7 @@ def calculate(measure_input, input_files: InputFiles): tune_d[plane]["QF"] = tune_d[plane]["Q"] - tune_d[plane]["QM"] + tune_d[plane]["QFM"] if measure_input.compensation == "equation": tune_d[plane]["ac2bpm"] = tune_d.phase_ac2bpm( - input_files.joined_frame(plane, [f"MU{plane}"], dpp_value=0, how='inner'), + input_files.joined_frame(plane, [f"MU{plane}"], dpp_value=dpp_value, how='inner'), plane, measure_input.accelerator) return tune_d diff --git a/tests/unit/test_hole_in_one.py b/tests/unit/test_hole_in_one.py index b39eba11..5014635e 100644 --- a/tests/unit/test_hole_in_one.py +++ b/tests/unit/test_hole_in_one.py @@ -23,6 +23,7 @@ from __future__ import annotations from pathlib import Path +import numpy as np import pytest from omc3.hole_in_one import hole_in_one_entrypoint, DEFAULT_CONFIG_FILENAME, LINFILES_SUBFOLDER @@ -49,7 +50,8 @@ @pytest.mark.extended @pytest.mark.parametrize("which_files", ("SINGLE", "0Hz", "all")) @pytest.mark.parametrize("clean", (True, False), ids=ids_str("clean={}")) -def test_hole_in_two(tmp_path, clean, which_files): +@pytest.mark.parametrize("dpp", (0, None), ids=ids_str("dpp={}")) +def test_hole_in_two(tmp_path, clean, which_files, dpp, caplog): """ Test that is closely related to how actual analysis are done. """ @@ -89,11 +91,14 @@ def test_hole_in_two(tmp_path, clean, which_files): _check_nonlinear_optics_files(optics_output, "rdt") _check_nonlinear_optics_files(optics_output, "crdt") + _check_caplog_for_rdt_warnings(caplog, to_be_found=(which_files == "all") and (dpp is None)) + @pytest.mark.extended @pytest.mark.parametrize("which_files", ("SINGLE", "0Hz", "all")) @pytest.mark.parametrize("clean", (True, False), ids=ids_str("clean={}")) -def test_hole_in_one(tmp_path, clean, which_files): +@pytest.mark.parametrize("dpp", (0, None), ids=ids_str("dpp={}")) +def test_hole_in_one(tmp_path, clean, which_files, dpp, caplog): """ This test runs harpy, optics and optics analysis in one. """ @@ -119,6 +124,7 @@ def test_hole_in_one(tmp_path, clean, which_files): accel="lhc", year="2022", model_dir=MODEL_DIR, + analyse_dpp=dpp, ) for sdds_file in files: @@ -127,6 +133,8 @@ def test_hole_in_one(tmp_path, clean, which_files): _check_linear_optics_files(output, off_momentum=(which_files == "all")) _check_nonlinear_optics_files(output, "rdt") _check_nonlinear_optics_files(output, "crdt") + + _check_caplog_for_rdt_warnings(caplog, to_be_found=(which_files == "all") and (dpp is None)) # Helper ----------------------------------------------------------------------- @@ -178,6 +186,22 @@ def _check_nonlinear_optics_files(outputdir: Path, type_: str): assert len(list(magnet_dir.glob(f"*{EXT}"))) > 0 +def _check_caplog_for_rdt_warnings(caplog, to_be_found: bool = False): + found = {"RDT": 0, "CRDT": 0, "Tune": 0, "Phase": 0} + + for record in caplog.records: + for key in found.keys(): + if f"included in the {key} calculation" in record.msg: + assert to_be_found, "Warnings still present, but should not have been!" + assert "Off-momentum files for analysis found!" in record.msg + assert record.levelname == "WARNING" + found[key] += 1 # should be twice, once per plane + if np.all(np.array(found.values()) == 2): + break + else: + assert not to_be_found, "Expected warnings not found!" + + def _get_sdds_files(which: str) -> list[Path]: if which in SDDS_FILES.keys(): return [SDDS_DIR / sdds_file for sdds_file in SDDS_FILES[which]]