From 779805744d34493e4120b6e5ff33905d4347293c Mon Sep 17 00:00:00 2001 From: malina Date: Fri, 15 Dec 2023 15:12:10 +0100 Subject: [PATCH] Simplification of the code --- pySC/correction/loco_modules.py | 84 ++++----------------------------- pySC/example_hmba.py | 34 ++++++------- 2 files changed, 27 insertions(+), 91 deletions(-) diff --git a/pySC/correction/loco_modules.py b/pySC/correction/loco_modules.py index 491db3c..b3f6f02 100644 --- a/pySC/correction/loco_modules.py +++ b/pySC/correction/loco_modules.py @@ -6,13 +6,12 @@ from pySC.core.beam import bpm_reading import matplotlib.pyplot as plt from scipy.optimize import least_squares -from scipy.stats import linregress from pySC.utils import logging_tools LOGGER = logging_tools.get_logger(__name__) def generating_jacobian(SC, C_model, dkick, used_cor_ind, bpm_indexes, quads_ind, dk, trackMode='ORB', - useIdealRing=True, skewness=False, order=1, method='add', includeDispersion=False, rf_step=1E3, + useIdealRing=True, skewness=False, order=1, method=SETTING_ADD, includeDispersion=False, rf_step=1E3, cav_ords=None, full_jacobian=True): pool = multiprocessing.Pool() quad_args = [(quad_index, SC, C_model, dkick, used_cor_ind, bpm_indexes, dk, trackMode, useIdealRing, @@ -21,6 +20,7 @@ def generating_jacobian(SC, C_model, dkick, used_cor_ind, bpm_indexes, quads_ind pool.close() pool.join() if full_jacobian: # # Construct the complete Jacobian matrix for the LOCO + # TODO modify for calibration errors of given size length_corrections = len(np.concatenate(used_cor_ind)) length_bpm = len(bpm_indexes) * 2 return np.concatenate((results, np.tile(C_model, (length_corrections + length_bpm, 1, 1)))) @@ -64,7 +64,7 @@ def measure_closed_orbit_response_matrix(SC, bpm_ords, cm_ords, dkick=1e-5): n_bpms = len(bpm_ords) n_cms = len(cm_ords[0]) + len(cm_ords[1]) response_matrix = np.full((2 * n_bpms * n_turns, n_cms), np.nan) - SC.INJ.trackMode = 'ORB' # TODO modifies SC! + SC.INJ.trackMode = 'ORB' # TODO modifies SC (not a pure function)! closed_orbits0 = bpm_reading(SC, bpm_ords=bpm_ords)[0] cnt = 0 @@ -122,14 +122,12 @@ def loco_correction(objective_function, initial_guess0, orbit_response_matrix_mo t3 = (np.dot(Jt, t2)).reshape(-1) initial_guess1 = initial_guess0 + t3 - t4 = abs(initial_guess1 - initial_guess0) + t4 = np.abs(initial_guess1 - initial_guess0) if max(t4) <= eps: break initial_guess0 = initial_guess1 - # e = np.dot(initial_guess0, J) - t2 - # params_to_check = calculate_parameters(initial_guess0, orbit_response_matrix_model, orbit_response_matrix_measured, J, lengths,including_fit_parameters) - return initial_guess0 # , params_to_check + return initial_guess0 def objective(delta_params, orbit_response_matrix_model, orbit_response_matrix_measured, J, lengths, @@ -164,74 +162,13 @@ def objective(delta_params, orbit_response_matrix_model, orbit_response_matrix_m return residuals.ravel() -def calculate_parameters(parameters, orbit_response_matrix_model, orbit_response_matrix_measured, J, lengths, - including_fit_parameters, W): - model = orbit_response_matrix_model - len_quads = lengths[0] - len_corr = lengths[1] - len_bpm = lengths[2] - - if 'quads' in including_fit_parameters: - delta_g = parameters[:len_quads] - B = np.sum([J[k] * delta_g[k] for k in range(len(delta_g))], axis=0) - model += B - - if 'cor' in including_fit_parameters: - delta_x = parameters[len_quads:len_quads + len_corr] - Co = orbit_response_matrix_model * delta_x - model += Co - - if 'bpm' in including_fit_parameters: - delta_y = parameters[len_quads + len_corr:] - G = orbit_response_matrix_model * delta_y[:, np.newaxis] - model += G - - residuals = orbit_response_matrix_measured - model - # Calculate R-squared - r_squared = r2_score(orbit_response_matrix_measured, model) # , sample_weight = 1) - - # Calculate RMSE - rms = np.sqrt(mean_squared_error(orbit_response_matrix_measured, model)) # , model, sample_weight = 1)) #np.diag(W) - - params_to_check_ = { - 'residulas': residuals, - 'r_squared': r_squared, - 'rmse': rms, - } - return params_to_check_ - - -def set_correction(SC, r, elem_ind, Individuals=True, skewness=False, order=1, method='add', dipole_compensation=True): - if Individuals: - for i in range(len(elem_ind)): - field = elem_ind[i].SCFieldName - # setpoint = fit_parameters.OrigValues[n_group] + damping * ( - # fit_parameters.IdealValues[n_group] - fit_parameters.Values[n_group]) - if field == 'SetPointB': # Normal quadrupole - SC.set_magnet_setpoints(ord, -r[i], False, 1, dipole_compensation=dipole_compensation) - elif field == 'SetPointA': # Skew quadrupole - SC.set_magnet_setpoints(ord, -r[i], True, 1) - - SC.set_magnet_setpoints(elem_ind[i], -r[i], skewness, order, method) +def set_correction(SC, r, elem_ind, individuals=True, skewness=False, order=1, method=SETTING_ADD, dipole_compensation=True): + if individuals: + SC.set_magnet_setpoints(elem_ind, -r, skewness, order, method, dipole_compensation=dipole_compensation) return SC - for quad_fam in range(len(elem_ind)): # TODO this is strange - for quad in quad_fam: - field = elem_ind[quad].SCFieldName - # setpoint = fit_parameters.OrigValues[n_group] + damping * ( - # fit_parameters.IdealValues[n_group] - fit_parameters.Values[n_group]) - if field == 'SetPointB': # Normal quadrupole - SC.set_magnet_setpoints(ord, -r[quad], False, 1, dipole_compensation=dipole_compensation) - elif field == 'SetPointA': # Skew quadrupole - SC.set_magnet_setpoints(ord, -r[quad], True, 1) - - SC.set_magnet_setpoints(elem_ind[quad], -r[quad], skewness, order, method) - return SC - - -def set_correction_(SC, r, elem_ind, skewness=False, order=1, method='add', dipole_compensation=True): - for i in range(len(elem_ind)): - SC.set_magnet_setpoints(elem_ind[i], -r[i], skewness, order, method) + for fam_num, quad_fam in enumerate(elem_ind): + SC.set_magnet_setpoints(quad_fam, -r[fam_num], skewness, order, method, dipole_compensation=dipole_compensation) return SC @@ -272,7 +209,6 @@ def select_equally_spaced_elements(total_elements, num_elements): def get_inverse(Jn, sCut, W): - Nk = len(Jn) A = np.zeros([Nk, Nk]) for i in range(Nk): diff --git a/pySC/example_hmba.py b/pySC/example_hmba.py index d120417..370d494 100644 --- a/pySC/example_hmba.py +++ b/pySC/example_hmba.py @@ -6,7 +6,7 @@ from pySC.utils.sc_tools import SCgetOrds from pySC.utils import logging_tools from pySC.correction.loco_modules import select_equally_spaced_elements, generating_jacobian, measure_closed_orbit_response_matrix, model_beta_beat, \ - loco_correction, set_correction_, objective, get_inverse + loco_correction, set_correction, objective, get_inverse from pySC.lattice_properties.response_model import SCgetModelRM, SCgetModelDispersion LOGGER = logging_tools.get_logger(__name__) @@ -30,9 +30,9 @@ def create_at_lattice() -> Lattice: ring = at.Lattice(create_at_lattice()) LOGGER.info(f"{len(ring)=}") SC = SimulatedCommissioning(ring) - SC.register_bpms(SCgetOrds(SC.RING, 'BPM'), Roll=0.0, CalError=1E-2 * np.ones(2)) + SC.register_bpms(SCgetOrds(SC.RING, 'BPM'), Roll=0.0, CalError=1E-2 * np.ones(2), NoiseCO=np.array([1e-6, 1E-6])) SC.register_magnets(SCgetOrds(SC.RING, 'QF|QD'), CalErrorB=np.array([0, 1E-2])) # relative - SC.register_magnets(SCgetOrds(SC.RING, 'CXY'), CalErrorA=np.array([1E-200, 0]), CalErrorB=np.array([1E-200, 0])) + SC.register_magnets(SCgetOrds(SC.RING, 'CXY'), CalErrorA=np.array([1E-2, 0]), CalErrorB=np.array([1E-2, 0])) SC.register_magnets(SCgetOrds(SC.RING, 'BEND')) SC.register_magnets(SCgetOrds(SC.RING, 'SF|SD')) # [1/m] SC.register_cavities(SCgetOrds(SC.RING, 'RFC')) @@ -49,7 +49,6 @@ def create_at_lattice() -> Lattice: CAVords = SCgetOrds(SC.RING, 'RFC') quadsOrds = [SCgetOrds(SC.RING, 'QF'), SCgetOrds(SC.RING, 'QD')] - CAVords = SCgetOrds(SC.RING, 'RFCav') CMstep = np.array([1.e-4]) # correctors change [rad] dk = 1.e-4 # quads change @@ -59,28 +58,29 @@ def create_at_lattice() -> Lattice: orbit_response_matrix_model = SCgetModelRM(SC, SC.ORD.BPM, CorOrds, trackMode='ORB', useIdealRing=True, dkick=CMstep) ModelDispersion = SCgetModelDispersion(SC, SC.ORD.BPM, CAVords, trackMode='ORB', Z0=np.zeros(6), nTurns=1, rfStep=RFstep, useIdealRing=True) - - orbit_response_matrix_measured = measure_closed_orbit_response_matrix(SC, SC.ORD.BPM, CorOrds, CMstep) - _, _, twiss_err = at.get_optics(SC.RING, SC.ORD.BPM) - Jn = generating_jacobian(SC, orbit_response_matrix_model, CMstep, CorOrds, SC.ORD.BPM, np.concatenate(quadsOrds), dk, trackMode='ORB', useIdealRing=False, skewness=False, order=1, method='add', includeDispersion=False, rf_step=RFstep, cav_ords=CAVords) - - orbit_response_matrix_measured = measure_closed_orbit_response_matrix(SC, SC.ORD.BPM, CorOrds, CMstep) - - - numberOfIteration = 1 + Jn = np.transpose(Jn, (0, 2, 1)) sCut = 16 W = 1 + from pySC.core.beam import bpm_reading + n_samples = 3 + a = np.empty((n_samples, 2, len(SC.ORD.BPM))) + for i in range(n_samples): + a[i] = bpm_reading(SC)[0] + errors = np.std(a, axis=0) + + Jt = get_inverse(Jn, sCut, W) + _, _, twiss_err = at.get_optics(SC.RING, SC.ORD.BPM) + orbit_response_matrix_measured = measure_closed_orbit_response_matrix(SC, SC.ORD.BPM, CorOrds, CMstep) + numberOfIteration = 1 + for x in range(numberOfIteration): # optics correction using QF and QD LOGGER.info(f'LOCO iteration {x}') C_measure = measure_closed_orbit_response_matrix(SC, SC.ORD.BPM, CorOrds, CMstep) bx_rms_err, by_rms_err = model_beta_beat(SC.RING, twiss, SC.ORD.BPM, makeplot=False) - Jn = np.transpose(Jn, (0, 2, 1)) - Jt = get_inverse(Jn, sCut, W) - quads = len(np.concatenate(quadsOrds)) cor = len(np.concatenate(CorOrds)) bpm = len(SC.ORD.BPM) * 2 @@ -105,7 +105,7 @@ def create_at_lattice() -> Lattice: dx = fit_parameters[lengths[0]:lengths[0] + lengths[1]] dy = fit_parameters[lengths[0] + lengths[1]:] LOGGER.info('SVD') - SC = set_correction_(SC, dg, np.concatenate(quadsOrds)) + SC = set_correction(SC, dg, np.concatenate(quadsOrds)) _, _, twiss_corr = at.get_optics(SC.RING, SC.ORD.BPM) bx_rms_cor, by_rms_cor = model_beta_beat(SC.RING, twiss, SC.ORD.BPM, makeplot=True) LOGGER.info(