diff --git a/.gitignore b/.gitignore index 70a1387..b382093 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ __pycache__/ *.egg-info/ .eggs/ /doc/_generated +/doc/_static/example_nxxas_data.h5 diff --git a/README.md b/README.md index e84a352..7259230 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # pynxxas Library for reading and writing XAS data in NeXus format. +An example HDF5 file can be found [here](https://myhdf5.hdfgroup.org/view?url=https%3A%2F%2Fpynxxas.readthedocs.io%2Fen%2F7-create-a-first-format-to-nxxas-conversion%2F_static%2Fexample_nxxas_data.h5) +

@@ -10,4 +12,6 @@ Library for reading and writing XAS data in NeXus format. + +

diff --git a/doc/_ext/myhdf5_inline_role.py b/doc/_ext/myhdf5_inline_role.py new file mode 100644 index 0000000..b28377f --- /dev/null +++ b/doc/_ext/myhdf5_inline_role.py @@ -0,0 +1,50 @@ +import re +import os +from docutils import nodes +from pynxxas.io.convert import convert_files + + +def setup(app): + app.add_role("myhdf5", myhdf5_role) + app.connect("html-page-context", inject_dynamic_url_js) + app.connect("config-inited", generate_example_nxxas_data) + + +def myhdf5_role(name, rawtext, text, lineno, inliner, options={}, content=[]): + matches = re.match(r"(\S+)\s*<([^<>]+)>", text) + display_text = matches.group(1) + filename = matches.group(2) + + url_template = f"https://myhdf5.hdfgroup.org/view?url=placeholder{filename}" + + link = f'{display_text}' + + node = nodes.raw("", link, format="html") + return [node], [] + + +def inject_dynamic_url_js(app, pagename, templatename, context, doctree): + if app.builder.name != "html" or doctree is None: + return + + script = """ + + """ + + context["body"] += script + + +def generate_example_nxxas_data(app, config): + output_filename = os.path.join(app.srcdir, "_static", "example_nxxas_data.h5") + file_pattern1 = os.path.join(app.srcdir, "..", "xdi_files", "*") + file_pattern2 = os.path.join(app.srcdir, "..", "xas_beamline_data", "*") + convert_files([file_pattern1, file_pattern2], output_filename, "nexus") diff --git a/doc/conf.py b/doc/conf.py index 2519940..65445f0 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -4,8 +4,12 @@ # -- Project information ----------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information +import os +import sys from pynxxas import __version__ as release +sys.path.append(os.path.abspath("./_ext")) + project = "pynxxas" version = ".".join(release.split(".")[:2]) copyright = "2024-present, ESRF" @@ -20,6 +24,7 @@ "sphinx.ext.autosummary", "sphinx.ext.viewcode", "sphinx_autodoc_typehints", + "myhdf5_inline_role", ] templates_path = ["_templates"] exclude_patterns = ["build"] @@ -39,7 +44,8 @@ # https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output html_theme = "pydata_sphinx_theme" -html_static_path = [] +html_static_path = ["_static"] +html_extra_path = [] html_theme_options = { "icon_links": [ { diff --git a/doc/howtoguides.rst b/doc/howtoguides.rst new file mode 100644 index 0000000..b9d659f --- /dev/null +++ b/doc/howtoguides.rst @@ -0,0 +1,7 @@ +How-to Guides +============= + +.. toctree:: + + howtoguides/install + howtoguides/convert_files diff --git a/doc/howtoguides/convert_files.rst b/doc/howtoguides/convert_files.rst new file mode 100644 index 0000000..1e6d2b7 --- /dev/null +++ b/doc/howtoguides/convert_files.rst @@ -0,0 +1,8 @@ +Convert file formats +==================== + +Convert all files in the *xdi_files* and *xas_beamline_data* to *HDF5/NeXus* format + +.. code-block:: bash + + nxxas-convert xdi_files/*.* xas_beamline_data/*.* ./converted/data.h5 diff --git a/doc/howtoguides/install.rst b/doc/howtoguides/install.rst new file mode 100644 index 0000000..69956a9 --- /dev/null +++ b/doc/howtoguides/install.rst @@ -0,0 +1,6 @@ +Install +======= + +.. code-block:: bash + + pip install pynxxas diff --git a/doc/index.rst b/doc/index.rst index 8ccdcb8..7c9e2da 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -3,7 +3,11 @@ pynxxas |version| Library for reading and writing XAS data in `NeXus format `_. +An example HDF5 file can be found :myhdf5:`here `. + .. toctree:: :hidden: + howtoguides + tutorials api diff --git a/doc/tutorials.rst b/doc/tutorials.rst new file mode 100644 index 0000000..68f6f41 --- /dev/null +++ b/doc/tutorials.rst @@ -0,0 +1,6 @@ +Tutorials +========= + +.. toctree:: + + tutorials/models diff --git a/doc/tutorials/models.rst b/doc/tutorials/models.rst new file mode 100644 index 0000000..8b22179 --- /dev/null +++ b/doc/tutorials/models.rst @@ -0,0 +1,34 @@ +Data models +=========== + +Data from different data formats are represented in memory as a *pydantic* models. +You can convert between different models and save/load models from file. + +NeXus models +------------ + +Build an *NXxas* model instance in steps + +.. code-block:: python + + from pynxxas.models import NxXasModel + + nxxas_model = NxXasModel(element="Fe", absorption_edge="K", mode="transmission") + nxxas_model.energy = [7, 7.1], "keV" + nxxas_model.intensity = [10, 20] + +Create an *NXxas* model instance from a dictionary and convert back to a dictionary + +.. code-block:: python + + data_in = { + "NX_class": "NXsubentry", + "mode": "transmission", + "element": "Fe", + "absorption_edge": "K", + "energy": [[7, 7.1], "keV"], + "intensity": [10, 20], + } + + nxxas_model = NxXasModel(**data_in) + data_out = nxxas_model.model_dump() diff --git a/setup.cfg b/setup.cfg index 75dbef3..3016783 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,6 +23,13 @@ package_dir= packages=find: python_requires = >=3.8 install_requires = + typing_extensions; python_version < "3.9" + strenum; python_version < "3.11" + numpy + h5py + pydantic >=2.6 + pint + periodictable [options.packages.find] where=src @@ -40,6 +47,10 @@ doc = sphinx-autodoc-typehints >=1.16 pydata-sphinx-theme < 0.15 +[options.entry_points] +console_scripts = + nxxas-convert=pynxxas.apps.nxxas_convert:main + # E501 (line too long) ignored for now # E203 and W503 incompatible with black formatting (https://black.readthedocs.io/en/stable/compatible_configs.html#flake8) [flake8] diff --git a/src/pynxxas/apps/__init__.py b/src/pynxxas/apps/__init__.py new file mode 100644 index 0000000..2c9e51d --- /dev/null +++ b/src/pynxxas/apps/__init__.py @@ -0,0 +1,2 @@ +"""Command-Line Interface (CLI) +""" diff --git a/src/pynxxas/apps/nxxas_convert.py b/src/pynxxas/apps/nxxas_convert.py new file mode 100644 index 0000000..c31a093 --- /dev/null +++ b/src/pynxxas/apps/nxxas_convert.py @@ -0,0 +1,47 @@ +import sys +import logging +import argparse + +from .. import models +from ..io.convert import convert_files + +logger = logging.getLogger(__name__) + + +def main(argv=None) -> int: + if argv is None: + argv = sys.argv + + parser = argparse.ArgumentParser( + prog="nxxas_convert", description="Convert data to NXxas format" + ) + + parser.add_argument( + "--output-format", + type=str, + default="nexus", + choices=list(models.MODELS), + help="Output format", + ) + + parser.add_argument( + "file_patterns", + type=str, + nargs="*", + help="Files to convert", + ) + + parser.add_argument( + "output_filename", type=str, help="Convert destination filename" + ) + + args = parser.parse_args(argv[1:]) + logging.basicConfig() + + convert_files( + args.file_patterns, args.output_filename, args.output_format, interactive=True + ) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/pynxxas/io/__init__.py b/src/pynxxas/io/__init__.py new file mode 100644 index 0000000..0a4300e --- /dev/null +++ b/src/pynxxas/io/__init__.py @@ -0,0 +1,31 @@ +"""File formats +""" + +from typing import Generator + +import pydantic + +from .url_utils import UrlType +from . import xdi +from . import nexus +from .. import models + + +def load_models(url: UrlType) -> Generator[pydantic.BaseModel, None, None]: + if xdi.is_xdi_file(url): + yield from xdi.load_xdi_file(url) + elif nexus.is_nexus_file(url): + yield from nexus.load_nexus_file(url) + else: + raise NotImplementedError(f"File format not supported: {url}") + + +def save_model(model_instance: pydantic.BaseModel, url: UrlType) -> None: + if isinstance(model_instance, models.NxXasModel): + nexus.save_nexus_file(model_instance, url) + elif isinstance(model_instance, models.XdiModel): + xdi.save_xdi_file(model_instance, url) + else: + raise NotImplementedError( + f"Saving of {type(model_instance).__name__} not implemented" + ) diff --git a/src/pynxxas/io/convert.py b/src/pynxxas/io/convert.py new file mode 100644 index 0000000..3e76ab1 --- /dev/null +++ b/src/pynxxas/io/convert.py @@ -0,0 +1,92 @@ +import logging +import pathlib +from glob import glob +from contextlib import contextmanager +from typing import Iterator, Generator + +import pydantic + +from .. import io +from .. import models +from ..models import convert + +logger = logging.getLogger(__name__) + + +def convert_files( + file_patterns: Iterator[str], + output_filename: str, + output_format: str, + interactive: bool = False, +) -> int: + model_type = models.MODELS[output_format] + + output_filename = pathlib.Path(output_filename) + if output_filename.exists(): + if interactive: + result = input(f"Overwrite {output_filename}? (y/[n])") + if not result.lower() in ("y", "yes"): + return 1 + output_filename.unlink() + output_filename.parent.mkdir(parents=True, exist_ok=True) + + state = {"return_code": 0, "scan_number": 0, "filename": None} + scan_number = 0 + for model_in in _iter_load_models(file_patterns, state): + scan_number += 1 + for model_out in _iter_convert_model(model_in, model_type, state): + if output_format == "nexus": + output_url = f"{output_filename}?path=/dataset{scan_number:02}" + if model_out.NX_class == "NXsubentry": + breakpoint() + output_url = f"{output_url}/{model_out.mode.replace(' ', '_')}" + else: + basename = f"{output_filename.stem}_{scan_number:02}" + if model_out.NX_class == "NXsubentry": + basename = f"{basename}_{model_out.mode.replace(' ', '_')}" + output_url = output_filename.parent / basename + output_filename.suffix + + with _handle_error("saving", state): + io.save_model(model_out, output_url) + + return state["return_code"] + + +def _iter_load_models( + file_patterns: Iterator[str], state: dict +) -> Generator[pydantic.BaseModel, None, None]: + for file_pattern in file_patterns: + for filename in glob(file_pattern): + filename = pathlib.Path(filename).absolute() + state["filename"] = filename + it_model_in = io.load_models(filename) + while True: + with _handle_error("loading", state): + try: + yield next(it_model_in) + except StopIteration: + break + + +def _iter_convert_model( + model_in: Iterator[pydantic.BaseModel], model_type: str, state: dict +) -> Generator[pydantic.BaseModel, None, None]: + it_model_out = convert.convert_model(model_in, model_type) + while True: + with _handle_error("converting", state): + try: + yield next(it_model_out) + except StopIteration: + break + + +@contextmanager +def _handle_error(action: str, state: dict) -> Generator[None, None, None]: + try: + yield + except NotImplementedError as e: + state["return_code"] = 1 + logger.warning("Error when %s '%s': %s", action, state["filename"], e) + except Exception: + state["return_code"] = 1 + logger.error("Error when %s '%s'", action, state["filename"], exc_info=True) diff --git a/src/pynxxas/io/hdf5_utils.py b/src/pynxxas/io/hdf5_utils.py new file mode 100644 index 0000000..80fda85 --- /dev/null +++ b/src/pynxxas/io/hdf5_utils.py @@ -0,0 +1,41 @@ +import os +from typing import Optional, Union + +import h5py + + +def create_hdf5_link( + h5group: h5py.Group, + target_name: str, + target_filename: Optional[str], + absolute: bool = False, +) -> Union[h5py.SoftLink, h5py.ExternalLink]: + """Create HDF5 soft link (supports relative down paths) or external link (supports relative paths).""" + this_name = h5group.name + this_filename = h5group.file.filename + + target_filename = target_filename or this_filename + + if os.path.isabs(target_filename): + rel_target_filename = os.path.relpath(target_filename, this_filename) + else: + rel_target_filename = target_filename + target_filename = os.path.abs(os.path.join(this_filename, target_filename)) + + if "." not in target_name: + rel_target_name = os.path.relpath(target_name, this_name) + else: + rel_target_name = target_name + target_name = os.path.abspath(os.path.join(this_name, target_name)) + + # Internal link + if rel_target_filename == ".": + if absolute or ".." in rel_target_name: + # h5py.SoftLink does not support relative links upwards + return h5py.SoftLink(target_name) + return h5py.SoftLink(rel_target_name) + + # External link + if absolute: + return h5py.ExternalLink(target_filename, target_name) + return h5py.ExternalLink(rel_target_filename, target_name) diff --git a/src/pynxxas/io/nexus.py b/src/pynxxas/io/nexus.py new file mode 100644 index 0000000..5bf4cff --- /dev/null +++ b/src/pynxxas/io/nexus.py @@ -0,0 +1,157 @@ +"""NeXus/HDF5 file format +""" + +from typing import Generator, Any, Tuple + +try: + from enum import StrEnum +except ImportError: + from strenum import StrEnum + + +import h5py +import pint +import pydantic + +from . import url_utils +from . import hdf5_utils +from ..models import nexus + + +def is_nexus_file(url: url_utils.UrlType) -> bool: + filename = url_utils.as_url(url).path + with open(filename, "rb") as file: + try: + with h5py.File(file, mode="r"): + return True + except Exception: + return False + + +def load_nexus_file(url: url_utils.UrlType) -> Generator[nexus.NxGroup, None, None]: + raise NotImplementedError(f"File format not supported: {url}") + + +def save_nexus_file(nxgroup: nexus.NxXasModel, url: url_utils.UrlType) -> None: + if not isinstance(nxgroup, nexus.NxXasModel): + raise TypeError(f"nxgroup is not of type NxXasModel ({type(nxgroup)})") + if not nxgroup.has_data(): + return + filename = url_utils.as_url(url).path + url = url_utils.as_url(url) + + with h5py.File(filename, mode="a", track_order=True) as nxroot: + nxparent = _prepare_nxparent(nxgroup, url, nxroot) + _save_nxgroup(nxgroup, nxparent) + + +def _save_nxgroup(nxgroup: nexus.NxGroup, nxparent: h5py.Group) -> None: + if not isinstance(nxgroup, nexus.NxGroup): + raise TypeError(f"nxgroup is not of type NxGroup ({type(nxgroup)})") + for field_name, field, field_value in _iter_model_fields(nxgroup): + if field_value is None: + continue + elif isinstance(field_value, nexus.NxGroup): + nxchild = nxparent.require_group(field_name) + _save_nxgroup(field_value, nxchild) + if isinstance(field_value, nexus.NxDataModel): + _set_default(nxchild) + elif field.alias and field.alias.startswith("@"): + try: + _save_attribute(nxparent, field_name, field_value) + except Exception as e: + raise ValueError( + f"{field_name} = {field_value} ({type(field_value)}) cannot be saved as an HDF5 attribute" + ) from e + else: + try: + _save_dataset(nxparent, field_name, field_value) + except Exception as e: + raise ValueError( + f"{field_name} = {field_value} ({type(field_value)}) cannot be saved as an HDF5 dataset" + ) from e + + +def _iter_model_fields( + model: pydantic.BaseModel, +) -> Generator[Tuple[str, pydantic.Field, Any], None, None]: + for field_name, field in model.__fields__.items(): + field_value = getattr(model, field_name) + yield field_name, field, field_value + + +def _save_dataset(nxparent: h5py.Group, field_name: str, field_value: Any) -> None: + if isinstance(field_value, nexus.NxField): + nxparent[field_name] = field_value.value + for attr_name, attr, attr_value in _iter_model_fields(field_value): + if attr.alias and attr.alias.startswith("@"): + nxparent[field_name].attrs[attr_name] = attr_value + elif isinstance(field_value, StrEnum): + nxparent[field_name] = str(field_value) + elif isinstance(field_value, pint.Quantity): + if field_value.size: + nxparent[field_name] = field_value.magnitude + units = str(field_value.units) + if units: + nxparent[field_name].attrs["units"] = units + elif isinstance(field_value, nexus.NxLinkModel): + link = hdf5_utils.create_hdf5_link( + nxparent, field_value.target_name, field_value.target_filename + ) + nxparent[field_name] = link + else: + nxparent[field_name] = field_value + + +def _save_attribute(nxparent: h5py.Group, field_name: str, field_value: Any) -> None: + if isinstance(field_value, StrEnum): + nxparent.attrs[field_name] = str(field_value) + else: + nxparent.attrs[field_name] = field_value + + +def _set_default(h5group: h5py.Group) -> None: + while h5group.name != "/": + h5group.parent.attrs["default"] = h5group.name.split("/")[-1] + h5group = h5group.parent + + +def _prepare_nxparent( + nxgroup: nexus.NxGroup, + url: url_utils.ParsedUrlType, + nxroot: h5py.File, +) -> h5py.Group: + """Creates and returns the parent group of `nxgroup`""" + internal_path = url_utils.as_url(url).internal_path + parts = [s for s in internal_path.split("/") if s] + nparts = len(parts) + + if nxgroup.NX_class == "NXroot": + if nparts != 0: + raise ValueError( + f"NXroot URL cannot have an internal path ({internal_path})" + ) + nxclasses = [] + elif nxgroup.NX_class == "NXentry": + if nparts != 1: + raise ValueError( + f"NXentry URL must have an internal path of 1 level deep ({internal_path})" + ) + nxclasses = ["NXentry"] + elif nxgroup.NX_class == "NXsubentry": + if nparts != 2: + raise ValueError( + f"NXsubentry URL must have an internal path of 2 levels deep ({internal_path})" + ) + nxclasses = ["NXentry", "NXsubentry"] + else: + nxclasses = ["NXentry"] + ["NXsubentry"] * (len(parts) - 1) + + nxroot.attrs.setdefault("NX_class", "NXroot") + + nxparent = nxroot + for part, nxclass in zip(parts, nxclasses): + nxparent = nxparent.require_group(part) + nxparent.attrs.setdefault("NX_class", nxclass) + + return nxparent diff --git a/src/pynxxas/io/url_utils.py b/src/pynxxas/io/url_utils.py new file mode 100644 index 0000000..82a414a --- /dev/null +++ b/src/pynxxas/io/url_utils.py @@ -0,0 +1,44 @@ +import os +import sys +import pathlib +import urllib.parse +import urllib.request +from typing import Union, NamedTuple + + +class ParsedUrlType(NamedTuple): + path: str + internal_path: str + + +UrlType = Union[str, pathlib.Path, urllib.parse.ParseResult, ParsedUrlType] + + +_WIN32 = sys.platform == "win32" + + +def as_url(url: UrlType) -> ParsedUrlType: + if isinstance(url, ParsedUrlType): + return url + + if isinstance(url, urllib.parse.ParseResult): + parsed = url + else: + url_str = str(url) + parsed = urllib.parse.urlparse(url_str) + if not parsed.scheme or (_WIN32 and len(parsed.scheme) == 1): + url_str = "file://" + os.path.abspath(url_str).replace("\\", "/") + parsed = urllib.parse.urlparse(url_str) + + if parsed.scheme != "file": + raise ValueError("URL is not a file") + + if parsed.netloc: + path = f"{parsed.netloc}{parsed.path}" + else: + path = parsed.path + + query = urllib.parse.parse_qs(parsed.query) + internal_path = query.get("path", [""])[0] + + return ParsedUrlType(path=path, internal_path=internal_path) diff --git a/src/pynxxas/io/xdi.py b/src/pynxxas/io/xdi.py new file mode 100644 index 0000000..2836dae --- /dev/null +++ b/src/pynxxas/io/xdi.py @@ -0,0 +1,158 @@ +"""XAS Data Interchange (XDI) file format +""" + +import re +import datetime +from typing import Union, Tuple, Optional, Generator + +import pint +import numpy + +from . import url_utils +from ..models import units +from ..models.xdi import XdiModel + + +def is_xdi_file(url: url_utils.UrlType) -> bool: + filename = url_utils.as_url(url).path + with open(filename, "r") as file: + try: + for line in file: + line = line.strip() + if not line: + continue + return line.startswith("# XDI") + except Exception: + return False + + +def load_xdi_file(url: url_utils.UrlType) -> Generator[XdiModel, None, None]: + """Specs described in + + https://github.com/XraySpectroscopy/XAS-Data-Interchange/blob/master/specification/spec.md + """ + filename = url_utils.as_url(url).path + content = {"comments": [], "column": dict(), "data": dict()} + + with open(filename, "r") as file: + # Version: first non-empty line + for line in file: + line = line.strip() + if not line: + continue + if not line.startswith("# XDI"): + raise ValueError(f"XDI file does not start with '# XDI': '{filename}'") + break + + # Fields and comments: lines starting with "#" + is_comment = False + for line in file: + line = line.strip() + + if not line.startswith("#"): + raise ValueError(f"Invalid XDI header line: '{line}'") + + if _XDI_HEADER_END_REGEX.match(line): + break + + if _XDI_FIELDS_END_REGEX.match(line): + # Next lines in the header are user comments + is_comment = True + continue + + if is_comment: + match_comment = _XDI_COMMENT_REGEX.match(line) + if not match_comment: + continue + (comment,) = match_comment.groups() + content["comments"].append(comment) + continue + + match_namespace = _XDI_FIELD_REGEX.match(line) + if match_namespace: + key, value = match_namespace.groups() + value = _parse_xdi_value(value) + key_parts = key.split(".") + if len(key_parts) > 1: + namespace, key = key_parts + namespace = namespace.lower() + key = key.lower() + key = _parse_xdi_value(key) + if namespace not in content: + content[namespace] = {} + content[namespace][key] = value + else: + key = key_parts[0] + key = _parse_xdi_value(key) + content[key] = value + + # Data + table = numpy.loadtxt(filename, dtype=float) + columns = [ + name + for _, name in sorted(content.pop("column").items(), key=lambda tpl: tpl[0]) + ] + for name, array in zip(columns, table.T): + name, quant = _parse_xdi_column_name(name) + content["data"][name] = array, quant + + yield XdiModel(**content) + + +def save_xdi_file(model_instance: XdiModel, url: url_utils.UrlType) -> None: + raise NotImplementedError( + f"Saving of {type(model_instance).__name__} not implemented" + ) + + +_XDI_FIELD_REGEX = re.compile(r"#\s*([\w.]+):\s*(.*)") +_XDI_COMMENT_REGEX = re.compile(r"#\s*(.*)") +_XDI_HEADER_END_REGEX = re.compile(r"#\s*-") +_XDI_FIELDS_END_REGEX = re.compile(r"#\s*///") +_NUMBER_REGEX = re.compile(r"(?=.)([+-]?([0-9]*)(\.([0-9]+))?)([eE][+-]?\d+)?\s+\w+") +_SPACES_REGEX = re.compile(r"\s+") + + +def _parse_xdi_value( + value: str, +) -> Union[str, datetime.datetime, pint.Quantity, Tuple[str, pint.Quantity]]: + # Dimensionless integral number + try: + return units.as_quantity(int(value)) + except ValueError: + pass + + # Dimensionless decimal number + try: + return units.as_quantity(float(value)) + except ValueError: + pass + + # Date and time + try: + return datetime.datetime.fromisoformat(value) + except ValueError: + pass + + # Number with units + if _NUMBER_REGEX.match(value): + try: + return units.as_quantity(value) + except pint.UndefinedUnitError: + pass + + return value + + +def _parse_xdi_column_name( + name: str, +) -> Union[Tuple[str, Optional[str]]]: + parts = _SPACES_REGEX.split(name) + if len(parts) == 1: + return name, None + try: + units.as_units(parts[-1]) + except pint.UndefinedUnitError: + return name, None + name = " ".join(parts[:-1]) + return name, parts[-1] diff --git a/src/pynxxas/models/__init__.py b/src/pynxxas/models/__init__.py new file mode 100644 index 0000000..9ef6a64 --- /dev/null +++ b/src/pynxxas/models/__init__.py @@ -0,0 +1,7 @@ +"""Data models +""" + +from .xdi import XdiModel +from .nexus import NxXasModel + +MODELS = {"xdi": XdiModel, "nexus": NxXasModel} diff --git a/src/pynxxas/models/convert/__init__.py b/src/pynxxas/models/convert/__init__.py new file mode 100644 index 0000000..b060393 --- /dev/null +++ b/src/pynxxas/models/convert/__init__.py @@ -0,0 +1,27 @@ +from typing import Type, Generator +import pydantic + +from . import xdi +from . import nexus +from .. import XdiModel +from .. import NxXasModel + + +def convert_model( + instance: pydantic.BaseModel, model_type: Type[pydantic.BaseModel] +) -> Generator[pydantic.BaseModel, None, None]: + if isinstance(instance, model_type): + yield instance + + mod_to = _CONVERT_MODULE.get(type(instance)) + mod_from = _CONVERT_MODULE.get(model_type) + if mod_to is None or mod_from is None: + raise NotImplementedError( + f"Conversion from {type(instance).__name__} to {model_type.__name__} is not implemented" + ) + + for nxxas_model in mod_to.to_nxxas(instance): + yield from mod_from.from_nxxas(nxxas_model) + + +_CONVERT_MODULE = {XdiModel: xdi, NxXasModel: nexus} diff --git a/src/pynxxas/models/convert/nexus.py b/src/pynxxas/models/convert/nexus.py new file mode 100644 index 0000000..9a59b77 --- /dev/null +++ b/src/pynxxas/models/convert/nexus.py @@ -0,0 +1,10 @@ +from typing import Generator +from .. import NxXasModel + + +def to_nxxas(nxxas_model: NxXasModel) -> Generator[NxXasModel, None, None]: + yield nxxas_model + + +def from_nxxas(nxxas_model: NxXasModel) -> Generator[NxXasModel, None, None]: + yield nxxas_model diff --git a/src/pynxxas/models/convert/xdi.py b/src/pynxxas/models/convert/xdi.py new file mode 100644 index 0000000..7fef8c4 --- /dev/null +++ b/src/pynxxas/models/convert/xdi.py @@ -0,0 +1,63 @@ +from typing import Generator + +from .. import XdiModel +from .. import NxXasModel + + +def to_nxxas(xdi_model: XdiModel) -> Generator[NxXasModel, None, None]: + has_mu = xdi_model.data.mutrans is not None or xdi_model.data.normtrans is not None + has_fluo = ( + xdi_model.data.mufluor is not None or xdi_model.data.normfluor is not None + ) + if not has_mu and not has_fluo: + return + + data = { + "element": xdi_model.element.symbol, + "absorption_edge": xdi_model.element.edge, + } + + if has_mu and has_fluo: + data["NX_class"] = "NXsubentry" + else: + data["NX_class"] = "NXentry" + + if xdi_model.facility and xdi_model.facility.name: + if xdi_model.beamline and xdi_model.beamline.name: + name = { + "value": f"{xdi_model.facility.name}-{xdi_model.beamline.name}", + "@short_name": xdi_model.beamline.name, + } + else: + name = {"value": xdi_model.facility.name} + data["instrument"] = {"name": name} + + if has_mu: + nxxas_model = NxXasModel(mode="transmission", **data) + nxxas_model.energy = xdi_model.data.energy + if xdi_model.data.mutrans is not None: + nxxas_model.intensity = xdi_model.data.mutrans + else: + nxxas_model.intensity = xdi_model.data.normtrans + yield nxxas_model + + if has_fluo: + nxxas_model = NxXasModel(mode="fluorescence yield", **data) + nxxas_model.energy = xdi_model.data.energy + if xdi_model.data.mufluor is not None: + nxxas_model.intensity = xdi_model.data.mufluor + else: + nxxas_model.intensity = xdi_model.data.normfluor + yield nxxas_model + + +def from_nxxas(nxxas_model: NxXasModel) -> Generator[XdiModel, None, None]: + xdi_model = XdiModel() + xdi_model.element.symbol = nxxas_model.element + xdi_model.element.edge = nxxas_model.absorption_edge + xdi_model.data.energy = nxxas_model.energy + if nxxas_model.mode == "transmission": + xdi_model.data.mutrans = nxxas_model.intensity + elif nxxas_model.mode == "fluorescence yield": + xdi_model.data.mufluor = nxxas_model.intensity + yield xdi_model diff --git a/src/pynxxas/models/nexus.py b/src/pynxxas/models/nexus.py new file mode 100644 index 0000000..40b698f --- /dev/null +++ b/src/pynxxas/models/nexus.py @@ -0,0 +1,103 @@ +"""NeXus data model_instance +""" + +from typing import Dict, Literal, List, Optional, Any + +try: + from enum import StrEnum +except ImportError: + from strenum import StrEnum + +import pydantic +import periodictable + +from . import units + + +class NxGroup(pydantic.BaseModel, extra="allow"): + pass + + +class NxField(pydantic.BaseModel): + value: Any + + +class NxClass: + _NXCLASSES: Dict[str, "NxClass"] = dict() + + def __init_subclass__(cls, nx_class: str, **kwargs): + super().__init_subclass__(**kwargs) + NxClass._NXCLASSES[nx_class] = cls + + +class NxLinkModel(pydantic.BaseModel): + target_name: str + target_filename: Optional[str] = None + + +class NxDataModel(NxClass, NxGroup, nx_class="NxData"): + NX_class: Literal["NXdata"] = pydantic.Field(default="NXdata", alias="@NX_class") + signal: Literal["intensity"] = pydantic.Field(default="intensity", alias="@signal") + axes: List[str] = pydantic.Field(default=["energy"], alias="@axes") + energy: NxLinkModel + intensity: NxLinkModel + + +class NxInstrumentName(NxField): + value: Optional[str] + short_name: Optional[str] = pydantic.Field(alias="@short_name") + + +class NxInstrument(NxClass, NxGroup, nx_class="NxInstrument"): + NX_class: Literal["NxInstrument"] = pydantic.Field( + default="NxInstrument", alias="@NX_class" + ) + name: Optional[NxInstrumentName] = None + + +class NxEntryClass(StrEnum): + NXentry = "NXentry" + NXsubentry = "NXsubentry" + + +class NxXasMode(StrEnum): + transmission = "transmission" + fluorescence_yield = "fluorescence yield" + + +ChemicalElement = StrEnum( + "ChemicalElement", {el.symbol: el.symbol for el in periodictable.elements} +) + +XRayCoreExcitationState = StrEnum( + "XRayCoreExcitationState", {s: s for s in ("K", "L1", "L2", "L3")} +) + + +class NxXasModel(NxClass, NxGroup, nx_class="NXxas"): + NX_class: NxEntryClass = pydantic.Field(alias="@NX_class", default="NXentry") + definition: Literal["NXxas"] = "NXxas" + mode: NxXasMode + element: ChemicalElement + absorption_edge: XRayCoreExcitationState + energy: units.PydanticQuantity = units.as_quantity([]) + intensity: units.PydanticQuantity = units.as_quantity([]) + title: Optional[str] = None + plot: Optional[NxDataModel] = None + instrument: Optional[NxInstrument] = None + + @pydantic.model_validator(mode="after") + def set_title(self) -> "NxXasModel": + if self.element is not None and self.absorption_edge is not None: + title = f"{self.element} {self.absorption_edge}" + if self.instrument is not None and self.instrument.name is not None: + title = f"{self.instrument.name.value}: {title}" + self.title = f"{title} ({self.mode})" + if self.plot is None: + energy = NxLinkModel(target_name="../energy") + intensity = NxLinkModel(target_name="../intensity") + self.plot = NxDataModel(energy=energy, intensity=intensity) + return self + + def has_data(self) -> bool: + return bool(self.energy.size and self.intensity.size) diff --git a/src/pynxxas/models/units.py b/src/pynxxas/models/units.py new file mode 100644 index 0000000..82cecb3 --- /dev/null +++ b/src/pynxxas/models/units.py @@ -0,0 +1,84 @@ +import pint +import pydantic +from pydantic_core import core_schema +from pydantic.json_schema import JsonSchemaValue + +from typing import Any, Sequence, Union, List + +try: + from typing import Annotated +except ImportError: + from typing_extensions import Annotated + +_REGISTRY = pint.UnitRegistry() +_REGISTRY.default_format = "~" # unit symbols instead of full unit names + + +def as_quantity(value: Union[str, pint.Quantity, Sequence]) -> pint.Quantity: + if isinstance(value, pint.Quantity): + return value + if ( + isinstance(value, Sequence) + and len(value) == 2 + and (isinstance(value[1], str) or value[1] is None) + ): + value, units = value + else: + units = None + return _REGISTRY.Quantity(value, units) + + +def as_units(value: Union[str, pint.Unit]) -> pint.Unit: + if isinstance(value, pint.Unit): + return value + return _REGISTRY.parse_units(value) + + +class _QuantityPydanticAnnotation: + # https://docs.pydantic.dev/latest/concepts/types/#handling-third-party-types + + @classmethod + def __get_pydantic_core_schema__( + cls, + _source_type: Any, + _handler: pydantic.GetCoreSchemaHandler, + ) -> core_schema.CoreSchema: + def serialize(value: Any) -> List: + value = as_quantity(value) + return [value.magnitude.tolist(), str(value.units)] + + json_schema = core_schema.chain_schema( + [ + core_schema.no_info_plain_validator_function(as_quantity), + ] + ) + + return core_schema.json_or_python_schema( + json_schema=json_schema, + python_schema=core_schema.union_schema( + [ + # check if it's an instance first before doing any further work + core_schema.is_instance_schema(pint.Quantity), + json_schema, + ] + ), + serialization=core_schema.plain_serializer_function_ser_schema(serialize), + ) + + @classmethod + def __get_pydantic_json_schema__( + cls, + _core_schema: core_schema.CoreSchema, + handler: pydantic.GetJsonSchemaHandler, + ) -> JsonSchemaValue: + return handler( + core_schema.union_schema( + [ + core_schema.float_schema(), + core_schema.list_schema(core_schema.float_schema()), + ] + ) + ) + + +PydanticQuantity = Annotated[pint.Quantity, _QuantityPydanticAnnotation] diff --git a/src/pynxxas/models/xdi.py b/src/pynxxas/models/xdi.py new file mode 100644 index 0000000..7353d07 --- /dev/null +++ b/src/pynxxas/models/xdi.py @@ -0,0 +1,124 @@ +"""XAS Data Interchange (XDI) data model_instance +""" + +import datetime +from typing import Optional, List, Any, Mapping + +import pydantic + +from . import units + + +class XdiBaseModel(pydantic.BaseModel, extra="allow"): + pass + + +class XdiFacilityNamespace(XdiBaseModel): + name: Optional[str] = None + energy: Optional[units.PydanticQuantity] = None + current: Optional[units.PydanticQuantity] = None + xray_source: Optional[str] = None + + +class XdiBeamlineNamespace(XdiBaseModel): + name: Optional[str] = None + collimation: Optional[str] = None + focusing: Optional[str] = None + harmonic_rejection: Optional[str] = None + + +class XdiMonoNamespace(XdiBaseModel): + name: Optional[str] = None + d_spacing: Optional[units.PydanticQuantity] = None + + +class XdiDetectorNamespace(XdiBaseModel): + i0: Optional[str] = None + it: Optional[str] = None + ifluo: Optional[str] = None + ir: Optional[str] = None + + @pydantic.model_validator(mode="before") + @classmethod + def rename_if(cls, data: Any) -> Any: + if not isinstance(data, Mapping): + return data + if "if" in data: + data = dict(data) + data["ifluo"] = data["if"] + return data + + +class XdiSampleNamespace(XdiBaseModel): + name: Optional[str] = None + id: Optional[str] = None + stoichiometry: Optional[str] = None + prep: Optional[str] = None + experimenters: Optional[str] = None + temperature: Optional[units.PydanticQuantity] = None + pressure: Optional[units.PydanticQuantity] = None + ph: Optional[units.PydanticQuantity] = None + eh: Optional[units.PydanticQuantity] = None + volume: Optional[units.PydanticQuantity] = None + porosity: Optional[units.PydanticQuantity] = None + density: Optional[units.PydanticQuantity] = None + concentration: Optional[units.PydanticQuantity] = None + resistivity: Optional[units.PydanticQuantity] = None + viscosity: Optional[units.PydanticQuantity] = None + electric_field: Optional[units.PydanticQuantity] = None + magnetic_field: Optional[units.PydanticQuantity] = None + magnetic_moment: Optional[units.PydanticQuantity] = None + crystal_structure: Optional[units.PydanticQuantity] = None + opacity: Optional[units.PydanticQuantity] = None + electrochemical_potential: Optional[units.PydanticQuantity] = None + + +class XdiScanNamespace(XdiBaseModel): + start_time: Optional[datetime.datetime] = None + end_time: Optional[datetime.datetime] = None + edge_energy: Optional[units.PydanticQuantity] = None + + +class XdiElementNamespace(XdiBaseModel): + symbol: Optional[str] = None + edge: Optional[str] = None + reference: Optional[str] = None + ref_edge: Optional[str] = None + + +class XdiData(XdiBaseModel): + energy: Optional[units.PydanticQuantity] = None + angle: Optional[units.PydanticQuantity] = None + i0: Optional[units.PydanticQuantity] = None + itrans: Optional[units.PydanticQuantity] = None + ifluor: Optional[units.PydanticQuantity] = None + irefer: Optional[units.PydanticQuantity] = None + mutrans: Optional[units.PydanticQuantity] = None + mufluor: Optional[units.PydanticQuantity] = None + murefer: Optional[units.PydanticQuantity] = None + normtrans: Optional[units.PydanticQuantity] = None + normfluor: Optional[units.PydanticQuantity] = None + normrefer: Optional[units.PydanticQuantity] = None + k: Optional[units.PydanticQuantity] = None + chi: Optional[units.PydanticQuantity] = None + chi_mag: Optional[units.PydanticQuantity] = None + chi_pha: Optional[units.PydanticQuantity] = None + chi_re: Optional[units.PydanticQuantity] = None + chi_im: Optional[units.PydanticQuantity] = None + r: Optional[units.PydanticQuantity] = None + chir_mag: Optional[units.PydanticQuantity] = None + chir_pha: Optional[units.PydanticQuantity] = None + chir_re: Optional[units.PydanticQuantity] = None + chir_im: Optional[units.PydanticQuantity] = None + + +class XdiModel(XdiBaseModel): + element: XdiElementNamespace = XdiElementNamespace() + scan: XdiScanNamespace = XdiScanNamespace() + mono: XdiMonoNamespace = XdiMonoNamespace() + beamline: XdiBeamlineNamespace = XdiBeamlineNamespace() + facility: XdiFacilityNamespace = XdiFacilityNamespace() + detector: XdiDetectorNamespace = XdiDetectorNamespace() + sample: XdiSampleNamespace = XdiSampleNamespace() + comments: List[str] = list() + data: XdiData = XdiData() diff --git a/src/pynxxas/tests/conftest.py b/src/pynxxas/tests/conftest.py new file mode 100644 index 0000000..142e104 --- /dev/null +++ b/src/pynxxas/tests/conftest.py @@ -0,0 +1,61 @@ +import pytest +from ..models import NxXasModel +from ..io.xdi import load_xdi_file + + +@pytest.fixture() +def xdi_file(tmp_path): + filename = tmp_path / "data.xdi" + with open(filename, "w") as fh: + fh.write(_XDI_CONTENT) + return filename + + +@pytest.fixture() +def xdi_model(xdi_file): + return next(load_xdi_file(xdi_file)) + + +@pytest.fixture() +def nxxas_model(): + return NxXasModel(**_NXXAS_CONTENT) + + +_NXXAS_CONTENT = { + "element": "Co", + "absorption_edge": "K", + "mode": "transmission", + "energy": [[7509, 7519], "eV"], + "intensity": [[-0.51329170, -0.78493490], ""], +} + +_XDI_CONTENT = """ +# XDI/1.0 GSE/1.0 +# Column.1: energy eV +# Column.2: mutrans +# Column.3: i0 +# Element.edge: K +# Element.symbol: Co +# Scan.edge_energy: 7709.0 +# Mono.name: Si 111 +# Mono.d_spacing: 3.13555 +# Beamline.name: 13-ID-C +# Beamline.collimation: none +# Beamline.harmonic_rejection: detuned +# Facility.name: APS +# Facility.energy: 7.00 GeV +# Facility.xray_source: APS undulator A +# Scan.start_time: 2001-06-26T21:21:20 +# Detector.I0: 10cm N2 +# Detector.I1: 10cm N2 +# Sample.name: Co metal foil +# Sample.prep: standard foil (Joe Wong boxed set) +# /// +# room temperature +# measured at beamline 13-ID-C +# vert slits = 0.3 x 0.3mm (at ~50m) +#---- +# energy mutrans i0 + 7509.0000 -0.51329170 165872.70 + 7519.0000 -0.78493490 161255.70 +""" diff --git a/src/pynxxas/tests/test_convert.py b/src/pynxxas/tests/test_convert.py new file mode 100644 index 0000000..29303e0 --- /dev/null +++ b/src/pynxxas/tests/test_convert.py @@ -0,0 +1,54 @@ +from .. import models +from ..models import convert + + +def test_xdi_to_xdi(xdi_model): + xdi_model = next(convert.convert_model(xdi_model, models.XdiModel)) + _assert_model(xdi_model) + + +def test_nxxas_to_nxxas(nxxas_model): + nxxas_model = next(convert.convert_model(nxxas_model, models.NxXasModel)) + _assert_model(nxxas_model) + + +def test_xdi_to_nexus(xdi_model): + nxxas_model = next(convert.convert_model(xdi_model, models.NxXasModel)) + _assert_model(nxxas_model) + + +def test_nexus_to_xdi(nxxas_model): + xdi_model = next(convert.convert_model(nxxas_model, models.XdiModel)) + _assert_model(xdi_model) + + +def _assert_xdi_model(xdi_model: models.XdiModel): + xdi_model.element.symbol = "Co" + assert str(xdi_model.data.energy.units) == "eV" + + assert xdi_model.data.energy.magnitude.tolist() == [7509, 7519] + assert str(xdi_model.data.energy.units) == "eV" + + assert xdi_model.data.mutrans.magnitude.tolist() == [-0.51329170, -0.78493490] + assert str(xdi_model.data.mutrans.units) == "" + + +def _assert_nxxas_model(xdi_model: models.NxXasModel): + xdi_model.element = "Co" + assert str(xdi_model.energy.units) == "eV" + + assert xdi_model.energy.magnitude.tolist() == [7509, 7519] + assert str(xdi_model.energy.units) == "eV" + + assert xdi_model.intensity.magnitude.tolist() == [-0.51329170, -0.78493490] + assert str(xdi_model.intensity.units) == "" + + +_ASSERT_MODEL = { + models.XdiModel: _assert_xdi_model, + models.NxXasModel: _assert_nxxas_model, +} + + +def _assert_model(model_instance): + _ASSERT_MODEL[type(model_instance)](model_instance) diff --git a/src/pynxxas/tests/test_nexus.py b/src/pynxxas/tests/test_nexus.py new file mode 100644 index 0000000..672471b --- /dev/null +++ b/src/pynxxas/tests/test_nexus.py @@ -0,0 +1,72 @@ +from ..models import NxXasModel + + +def test_nxxas(): + data = { + "@NX_class": "NXsubentry", + "definition": "NXxas", + "mode": "transmission", + "element": "Fe", + "absorption_edge": "K", + "energy": [[7, 7.1], "keV"], + "intensity": [10, 20], + } + model_instance = NxXasModel(**data) + + expected = _expected_content("NXsubentry", [[7, 7.1], "keV"], [[10, 20], ""]) + assert model_instance.model_dump() == expected + + +def test_nxxas_defaults(): + data = { + "mode": "transmission", + "element": "Fe", + "absorption_edge": "K", + } + model_instance = NxXasModel(**data) + + expected = _expected_content("NXentry", [[], ""], [[], ""]) + assert model_instance.model_dump() == expected + + +def test_nxxas_fill_data(): + data = { + "mode": "transmission", + "element": "Fe", + "absorption_edge": "K", + } + model_instance = NxXasModel(**data) + model_instance.energy = [7, 7.1], "keV" + model_instance.intensity = [10, 20] + + expected = _expected_content("NXentry", [[7, 7.1], "keV"], [[10, 20], ""]) + assert model_instance.model_dump() == expected + + +def _expected_content(nx_class, energy, intensity): + return { + "NX_class": nx_class, + "definition": "NXxas", + "mode": "transmission", + "element": "Fe", + "absorption_edge": "K", + "energy": energy, + "intensity": intensity, + "title": "Fe K (transmission)", + "instrument": None, + "plot": { + "NX_class": "NXdata", + "axes": [ + "energy", + ], + "energy": { + "target_filename": None, + "target_name": "../energy", + }, + "intensity": { + "target_filename": None, + "target_name": "../intensity", + }, + "signal": "intensity", + }, + } diff --git a/src/pynxxas/tests/test_todo.py b/src/pynxxas/tests/test_todo.py deleted file mode 100644 index 4f6c6c3..0000000 --- a/src/pynxxas/tests/test_todo.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_todo(): - pass diff --git a/src/pynxxas/tests/test_units.py b/src/pynxxas/tests/test_units.py new file mode 100644 index 0000000..8e994a7 --- /dev/null +++ b/src/pynxxas/tests/test_units.py @@ -0,0 +1,27 @@ +import pint +import numpy +from pydantic import TypeAdapter + + +from ..models import units + + +def test_pydantic_quantity(): + ta = TypeAdapter(units.PydanticQuantity) + _assert_quantity_equal(ta, 10) + _assert_quantity_equal(ta, 10.5) + _assert_quantity_equal(ta, 10.5, "eV") + _assert_quantity_equal(ta, [10.5, 11.5]) + _assert_quantity_equal(ta, [10.5, 11.5], "eV") + _assert_quantity_equal(ta, [10.5, 11.5], None) + + +def _assert_quantity_equal(ta: TypeAdapter, *args): + expected = units.as_quantity(args) + + validated = ta.validate_python(args) + assert isinstance(expected, pint.Quantity) + numpy.testing.assert_equal(validated.magnitude, expected.magnitude) + assert str(validated.units) == str(expected.units) + + validated = ta.validate_python(expected) diff --git a/src/pynxxas/tests/test_xdi.py b/src/pynxxas/tests/test_xdi.py new file mode 100644 index 0000000..bee2bf3 --- /dev/null +++ b/src/pynxxas/tests/test_xdi.py @@ -0,0 +1,33 @@ +from ..io import xdi + + +def test_is_xdi(xdi_file): + assert xdi.is_xdi_file(xdi_file) + + +def test_load_xdi_file(xdi_file): + models = list(xdi.load_xdi_file(xdi_file)) + assert len(models) == 1 + model_instance = models[0] + + # Fields + assert model_instance.facility.energy.magnitude == 7 + assert str(model_instance.facility.energy.units) == "GeV" + + # User ccomments + comments = [ + "room temperature", + "measured at beamline 13-ID-C", + "vert slits = 0.3 x 0.3mm (at ~50m)", + ] + assert model_instance.comments == comments + + # XAS data + assert model_instance.data.energy.magnitude.tolist() == [7509, 7519] + assert str(model_instance.data.energy.units) == "eV" + + assert model_instance.data.mutrans.magnitude.tolist() == [-0.51329170, -0.78493490] + assert str(model_instance.data.mutrans.units) == "" + + assert model_instance.data.i0.magnitude.tolist() == [165872.70, 161255.70] + assert str(model_instance.data.i0.units) == ""