diff --git a/src/essreflectometry/amor/beamline.py b/src/essreflectometry/amor/beamline.py index 2599c79..be4e94d 100644 --- a/src/essreflectometry/amor/beamline.py +++ b/src/essreflectometry/amor/beamline.py @@ -10,6 +10,8 @@ @log_call( instrument='amor', message='Constructing AMOR beamline from default parameters' ) +BeamlineParams = NewType('Beamline', dict) + def make_beamline( sample_rotation: sc.Variable, beam_size: sc.Variable = None, @@ -20,7 +22,7 @@ def make_beamline( chopper_phase: sc.Variable = None, chopper_1_position: sc.Variable = None, chopper_2_position: sc.Variable = None, -) -> dict: +) -> BeamlineParams: """ Amor beamline components. @@ -91,7 +93,7 @@ def make_beamline( position=chopper_1_position, ) ) - return beamline + return BeamlineParams(beamline) @log_call(instrument='amor', level='DEBUG') diff --git a/src/essreflectometry/amor/conversions.py b/src/essreflectometry/amor/conversions.py index 8623faf..3a643a2 100644 --- a/src/essreflectometry/amor/conversions.py +++ b/src/essreflectometry/amor/conversions.py @@ -1,8 +1,10 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2023 Scipp contributors (https://github.com/scipp) +from typing import NewType import scipp as sc from ..reflectometry.conversions import specular_reflection as spec_relf_graph +from ..reflectometry.types import SpecularReflectionCoordTransformGraph def incident_beam( @@ -22,10 +24,10 @@ def incident_beam( return sample_position - chopper_midpoint -def specular_reflection() -> dict: +def specular_reflection() -> SpecularReflectionCoordTransformGraph: """ Generate a coordinate transformation graph for Amor reflectometry. """ graph = spec_relf_graph() graph['incident_beam'] = incident_beam - return graph + return SpecularReflectionCoordTransformGraph(graph) diff --git a/src/essreflectometry/amor/load.py b/src/essreflectometry/amor/load.py index 42f27a5..5042977 100644 --- a/src/essreflectometry/amor/load.py +++ b/src/essreflectometry/amor/load.py @@ -2,13 +2,14 @@ # Copyright (c) 2023 Scipp contributors (https://github.com/scipp) from datetime import datetime from pathlib import Path -from typing import Any, Optional, Union +from typing import Any, Optional, Union, NewType, TypeVar import scipp as sc import scippnexus as snx from ..logging import get_logger -from .beamline import make_beamline +from .beamline import make_beamline, BeamlineParams +from .types import Run, Raw, Filename def _tof_correction(data: sc.DataArray, dim: str = 'tof') -> sc.DataArray: @@ -83,19 +84,13 @@ def _load_nexus_entry(filename: Union[str, Path]) -> sc.DataGroup: return f['entry'][()] -def load( - filename: Union[str, Path], - orso: Optional[Any] = None, - beamline: Optional[dict] = None, -) -> sc.DataArray: +def load(filename: Filename[Run], beamline: BeamlineParams) -> Raw[Run]: """Load a single Amor data file. Parameters ---------- filename: Path of the file to load. - orso: - The orso object to be populated by additional information from the loaded file. beamline: A dict defining the beamline parameters. @@ -124,16 +119,15 @@ def load( ) # Add beamline parameters - beamline = make_beamline() if beamline is None else beamline for key, value in beamline.items(): data.coords[key] = value - if orso is not None: - populate_orso(orso=orso, data=full_data, filename=filename) - data.attrs['orso'] = sc.scalar(orso) + # if orso is not None: + # populate_orso(orso=orso, data=full_data, filename=filename) + # data.attrs['orso'] = sc.scalar(orso) # Perform tof correction and fold two pulses - return _tof_correction(data) + return Raw[Run](_tof_correction(data)) def populate_orso(orso: Any, data: sc.DataGroup, filename: str) -> Any: diff --git a/src/essreflectometry/reflectometry/conversions.py b/src/essreflectometry/reflectometry/conversions.py index bbabb27..169cc9e 100644 --- a/src/essreflectometry/reflectometry/conversions.py +++ b/src/essreflectometry/reflectometry/conversions.py @@ -4,6 +4,15 @@ from scipp.constants import h, m_n, pi from scippneutron._utils import elem_dtype, elem_unit from scippneutron.conversion.graph import beamline, tof +from .types import ( + ThetaBins, + WavelengthBins, + SpecularReflectionCoordTransformGraph, + ThetaData, + WavelengthData, + Run, + Raw, +) from . import orso @@ -86,7 +95,7 @@ def reflectometry_q(wavelength: sc.Variable, theta: sc.Variable) -> sc.Variable: return c * sc.sin(theta.astype(dtype, copy=False)) / wavelength -def specular_reflection() -> dict: +def specular_reflection() -> SpecularReflectionCoordTransformGraph: """ Generate a coordinate transformation graph for specular reflection reflectometry. @@ -101,12 +110,14 @@ def specular_reflection() -> dict: "theta": theta, "Q": reflectometry_q, } - return graph + return SpecularReflectionCoordTransformGraph(graph) def tof_to_wavelength( - data_array: sc.DataArray, wavelength_edges: sc.Variable = None, graph: dict = None -) -> sc.DataArray: + data_array: Raw[Run], + wavelength_edges: WavelengthBins, + graph: SpecularReflectionCoordTransformGraph, +) -> WavelengthData[Run]: """ Use :code:`transform_coords` to convert from ToF to wavelength, cutoff high and low limits for wavelength, and add necessary ORSO metadata. @@ -126,32 +137,33 @@ def tof_to_wavelength( : New data array with wavelength dimension. """ - graph = graph if graph is not None else specular_reflection() data_array_wav = data_array.transform_coords(["wavelength"], graph=graph) - if wavelength_edges is not None: - data_array_wav = data_array_wav.bin({wavelength_edges.dim: wavelength_edges}) - try: - from orsopy import fileio - - unit = data_array_wav.coords['wavelength'].unit - # This insures that when the unit is Å it is written as - # angstrom in the ORSO object. - if unit == 'angstrom': - unit = 'angstrom' - orso_measurement = data_array_wav.attrs['orso'].value.data_source.measurement - orso_measurement.instrument_settings.wavelength = fileio.base.ValueRange( - float(data_array_wav.coords['wavelength'].min().value), - float(data_array_wav.coords['wavelength'].max().value), - unit, - ) - except ImportError: - orso.not_found_warning() - return data_array_wav + data_array_wav = data_array_wav.bin({wavelength_edges.dim: wavelength_edges}) + # TODO + # try: + # from orsopy import fileio + + # unit = data_array_wav.coords['wavelength'].unit + # # This insures that when the unit is Å it is written as + # # angstrom in the ORSO object. + # if unit == 'angstrom': + # unit = 'angstrom' + # orso_measurement = data_array_wav.attrs['orso'].value.data_source.measurement + # orso_measurement.instrument_settings.wavelength = fileio.base.ValueRange( + # float(data_array_wav.coords['wavelength'].min().value), + # float(data_array_wav.coords['wavelength'].max().value), + # unit, + # ) + # except ImportError: + # orso.not_found_warning() + return WavelengthData[Run](data_array_wav) def wavelength_to_theta( - data_array: sc.DataArray, theta_edges: sc.Variable = None, graph: dict = None -) -> sc.DataArray: + data_array: WavelengthData[Run], + theta_edges: ThetaBins, + graph: SpecularReflectionCoordTransformGraph, +) -> ThetaData[Run]: """ Use :code:`transform_coords` to find the theta values for the events and potentially add ORSO metadata. @@ -171,34 +183,33 @@ def wavelength_to_theta( : New data array with theta coordinate. """ - graph = graph if graph is not None else specular_reflection() data_array_theta = data_array.transform_coords(['theta'], graph=graph) - if theta_edges is not None: - data_array_theta = data_array_theta.bin({theta_edges.dim: theta_edges}) - try: - from orsopy import fileio - - orso_measurement = data_array_theta.attrs['orso'].value.data_source.measurement - orso_measurement.instrument_settings.incident_angle = fileio.base.ValueRange( - float(data_array_theta.coords['theta'].min().value), - float(data_array_theta.coords['theta'].max().value), - data_array_theta.bins.coords['theta'].min().unit, - ) - import inspect - - # Determine if 'gravity' is in the graph and if to add the gravity correction - if any( - [ - 'gravity' in i.parameters.keys() - for i in map(inspect.signature, graph.values()) - ] - ): - data_array_theta.attrs['orso'].value.reduction.corrections += [ - 'gravity correction' - ] - except ImportError: - orso.not_found_warning() - return data_array_theta + data_array_theta = data_array_theta.bin({theta_edges.dim: theta_edges}) + # TODO + # try: + # from orsopy import fileio + + # orso_measurement = data_array_theta.attrs['orso'].value.data_source.measurement + # orso_measurement.instrument_settings.incident_angle = fileio.base.ValueRange( + # float(data_array_theta.coords['theta'].min().value), + # float(data_array_theta.coords['theta'].max().value), + # data_array_theta.bins.coords['theta'].min().unit, + # ) + # import inspect + + # # Determine if 'gravity' is in the graph and if to add the gravity correction + # if any( + # [ + # 'gravity' in i.parameters.keys() + # for i in map(inspect.signature, graph.values()) + # ] + # ): + # data_array_theta.attrs['orso'].value.reduction.corrections += [ + # 'gravity correction' + # ] + # except ImportError: + # orso.not_found_warning() + return ThetaData[Run](data_array_theta) def theta_to_q( @@ -249,3 +260,6 @@ def sum_bins(data_array: sc.DataArray): 'angular_resolution' ].max('detector_number') return data_array_summed + + +providers = [specular_reflection, tof_to_wavelength, wavelength_to_theta] diff --git a/src/essreflectometry/reflectometry/types.py b/src/essreflectometry/reflectometry/types.py new file mode 100644 index 0000000..e8da97e --- /dev/null +++ b/src/essreflectometry/reflectometry/types.py @@ -0,0 +1,53 @@ +from typing import NewType, TypeVar + +import sciline +import scipp as sc + +Reference = NewType('Reference', str) +Sample = NewType('Sample', str) +Run = TypeVar('Run', Reference, Sample) + + +class Filename(sciline.Scope[Run, str], str): + """Filename of the raw data""" + + +class Raw(sciline.Scope[Run, sc.DataArray], sc.DataArray): + """Raw data""" + + +class WavelengthData(sciline.Scope[Run, sc.DataArray], sc.DataArray): + """Raw data transformed to wavelength""" + + +class ThetaData(sciline.Scope[Run, sc.DataArray], sc.DataArray): + """Wavelength data transformed to theta""" + + +class Experiment(sciline.Scope[Run, sc.DataArray], sc.DataArray): + """Experiment data with added coordinates: + wavelength, incidence angle, and momentum transfer""" + + +class FootprintCorrected(sciline.Scope[Run, sc.DataArray], sc.DataArray): + """Experiment data corrected by footprint on sample""" + + +CalibratedReference = NewType('CalibratedReference', sc.DataArray) +WithQResolution = NewType('WithQResolution', sc.DataArray) +Normalized = NewType('Normalized', sc.DataArray) +CountsByMomentumTransfer = NewType('CountsByMomentumTransfer', sc.DataArray) + +''' Parameters for the workflow ''' +MomentumTransferBins = NewType('MomentumTransferBins', sc.Variable) +WavelengthBins = NewType('WavelengthBins', sc.Variable) +ThetaBins = NewType('ThetaBins', sc.Variable) + + +class Rotation(sciline.Scope[Run, sc.Variable], sc.Variable): + """The rotation of the sample / the reference sample""" + + +SpecularReflectionCoordTransformGraph = NewType( + 'SpecularReflectionCoordTransformGraph', dict +)