diff --git a/.gitignore b/.gitignore index ff21c1627..7867d7665 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,22 @@ __pycache__/ build/ makelog.txt -# Unknown -/python/ -__github_creds__.txt +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 000000000..20485f628 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,4 @@ +recursive-include applications/ *.nxdl.xml +recursive-include contributed_definitions/ *.nxdl.xml +recursive-include base_classes/ *.nxdl.xml +include ./ *.xsd \ No newline at end of file diff --git a/Makefile b/Makefile index ae556d733..113e29db8 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ PYTHON = python3 SPHINX = sphinx-build BUILD_DIR = "build" +NXDL_DIRS := contributed_definitions applications base_classes .PHONY: help install style autoformat test clean prepare html pdf impatient-guide all local @@ -49,6 +50,9 @@ test :: clean :: $(RM) -rf $(BUILD_DIR) + for dir in $(NXDL_DIRS); do\ + $(RM) -rf $${dir}/nyaml;\ + done prepare :: $(PYTHON) -m dev_tools manual --prepare --build-root $(BUILD_DIR) @@ -83,6 +87,15 @@ all :: @echo "HTML built: `ls -lAFgh $(BUILD_DIR)/manual/build/html/index.html`" @echo "PDF built: `ls -lAFgh $(BUILD_DIR)/manual/build/latex/nexus.pdf`" +NXDLS := $(foreach dir,$(NXDL_DIRS),$(wildcard $(dir)/*.nxdl.xml)) +nyaml : $(DIRS) $(NXDLS) + for file in $^; do\ + mkdir -p "$${file%/*}/nyaml";\ + nyaml2nxdl --input-file $${file};\ + FNAME=$${file##*/};\ + mv -- "$${file%.nxdl.xml}_parsed.yaml" "$${file%/*}/nyaml/$${FNAME%.nxdl.xml}.yaml";\ + done + # NeXus - Neutron and X-ray Common Data Format # diff --git a/dev_tools/docs/nxdl.py b/dev_tools/docs/nxdl.py index 8da3ebbc0..4c36ef277 100644 --- a/dev_tools/docs/nxdl.py +++ b/dev_tools/docs/nxdl.py @@ -12,6 +12,7 @@ from ..globals.errors import NXDLParseError from ..globals.nxdl import NXDL_NAMESPACE from ..globals.urls import REPO_URL +from ..utils import nxdl_utils as pynxtools_nxlib from ..utils.types import PathLike from .anchor_list import AnchorRegistry @@ -109,7 +110,7 @@ def _parse_nxdl_file(self, nxdl_file: Path): # print official description of this class self._print("") self._print("**Description**:\n") - self._print_doc(self._INDENTATION_UNIT, ns, root, required=True) + self._print_doc_enum("", ns, root, required=True) # print symbol list node_list = root.xpath("nx:symbols", namespaces=ns) @@ -119,7 +120,7 @@ def _parse_nxdl_file(self, nxdl_file: Path): elif len(node_list) > 1: raise Exception(f"Invalid symbol table in {nxclass_name}") else: - self._print_doc(self._INDENTATION_UNIT, ns, node_list[0]) + self._print_doc_enum("", ns, node_list[0]) for node in node_list[0].xpath("nx:symbol", namespaces=ns): doc = self._get_doc_line(ns, node) self._print(f" **{node.get('name')}**", end="") @@ -498,6 +499,35 @@ def _print_doc(self, indent, ns, node, required=False): self._print(f"{indent}{line}") self._print() + def long_doc(self, ns, node): + length = 0 + line = "documentation" + fnd = False + blocks = self._get_doc_blocks(ns, node) + for block in blocks: + lines = block.splitlines() + length += len(lines) + for single_line in lines: + if len(single_line) > 2 and single_line[0] != "." and not fnd: + fnd = True + line = single_line + return (length, line, blocks) + + def _print_doc_enum(self, indent, ns, node, required=False): + collapse_indent = indent + node_list = node.xpath("nx:enumeration", namespaces=ns) + (doclen, line, blocks) = self.long_doc(ns, node) + if len(node_list) + doclen > 1: + collapse_indent = f"{indent} " + self._print(f"{indent}{self._INDENTATION_UNIT}.. collapse:: {line} ...\n") + self._print_doc( + collapse_indent + self._INDENTATION_UNIT, ns, node, required=required + ) + if len(node_list) == 1: + self._print_enumeration( + collapse_indent + self._INDENTATION_UNIT, ns, node_list[0] + ) + def _print_attribute(self, ns, kind, node, optional, indent, parent_path): name = node.get("name") index_name = name @@ -506,12 +536,9 @@ def _print_attribute(self, ns, kind, node, optional, indent, parent_path): ) self._print(f"{indent}.. index:: {index_name} ({kind} attribute)\n") self._print( - f"{indent}**@{name}**: {optional}{self._format_type(node)}{self._format_units(node)}\n" + f"{indent}**@{name}**: {optional}{self._format_type(node)}{self._format_units(node)} {self.get_first_parent_ref(f'{parent_path}/{name}', 'attribute')}\n" ) - self._print_doc(indent + self._INDENTATION_UNIT, ns, node) - node_list = node.xpath("nx:enumeration", namespaces=ns) - if len(node_list) == 1: - self._print_enumeration(indent + self._INDENTATION_UNIT, ns, node_list[0]) + self._print_doc_enum(indent, ns, node) def _print_if_deprecated(self, ns, node, indent): deprecated = node.get("deprecated", None) @@ -549,17 +576,12 @@ def _print_full_tree(self, ns, parent, name, indent, parent_path): f"{self._format_type(node)}" f"{dims}" f"{self._format_units(node)}" + f" {self.get_first_parent_ref(f'{parent_path}/{name}', 'field')}" "\n" ) self._print_if_deprecated(ns, node, indent + self._INDENTATION_UNIT) - self._print_doc(indent + self._INDENTATION_UNIT, ns, node) - - node_list = node.xpath("nx:enumeration", namespaces=ns) - if len(node_list) == 1: - self._print_enumeration( - indent + self._INDENTATION_UNIT, ns, node_list[0] - ) + self._print_doc_enum(indent, ns, node) for subnode in node.xpath("nx:attribute", namespaces=ns): optional = self._get_required_or_optional_text(subnode) @@ -585,10 +607,12 @@ def _print_full_tree(self, ns, parent, name, indent, parent_path): # target = hTarget.replace(".. _", "").replace(":\n", "") # TODO: https://github.com/nexusformat/definitions/issues/1057 self._print(f"{indent}{hTarget}") - self._print(f"{indent}**{name}**: {optional_text}{typ}\n") + self._print( + f"{indent}**{name}**: {optional_text}{typ} {self.get_first_parent_ref(f'{parent_path}/{name}', 'group')}\n" + ) self._print_if_deprecated(ns, node, indent + self._INDENTATION_UNIT) - self._print_doc(indent + self._INDENTATION_UNIT, ns, node) + self._print_doc_enum(indent, ns, node) for subnode in node.xpath("nx:attribute", namespaces=ns): optional = self._get_required_or_optional_text(subnode) @@ -619,8 +643,49 @@ def _print_full_tree(self, ns, parent, name, indent, parent_path): f"(suggested target: ``{node.get('target')}``)" "\n" ) - self._print_doc(indent + self._INDENTATION_UNIT, ns, node) + self._print_doc_enum(indent, ns, node) def _print(self, *args, end="\n"): # TODO: change instances of \t to proper indentation self._rst_lines.append(" ".join(args) + end) + + def get_first_parent_ref(self, path, tag): + nx_name = path[1 : path.find("/", 1)] + path = path[path.find("/", 1) :] + + try: + parents = pynxtools_nxlib.get_inherited_nodes(path, nx_name)[2] + except FileNotFoundError: + return "" + if len(parents) > 1: + parent = parents[1] + parent_path = parent_display_name = parent.attrib["nxdlpath"] + parent_path_segments = parent_path[1:].split("/") + parent_def_name = parent.attrib["nxdlbase"][ + parent.attrib["nxdlbase"] + .rfind("/") : parent.attrib["nxdlbase"] + .rfind(".nxdl") + ] + + # Case where the first parent is a base_class + if parent_path_segments[0] == "": + return "" + + # special treatment for NXnote@type + if ( + tag == "attribute" + and parent_def_name == "/NXnote" + and parent_path == "/type" + ): + return "" + + if tag == "attribute": + pos_of_right_slash = parent_path.rfind("/") + parent_path = ( + parent_path[:pos_of_right_slash] + + "@" + + parent_path[pos_of_right_slash + 1 :] + ) + parent_display_name = f"{parent_def_name[1:]}{parent_path}" + return f":ref:`⤆ `" + return "" diff --git a/dev_tools/nyaml2nxdl/README.md b/dev_tools/nyaml2nxdl/README.md new file mode 100644 index 000000000..ff083e189 --- /dev/null +++ b/dev_tools/nyaml2nxdl/README.md @@ -0,0 +1,72 @@ +# YAML to NXDL converter and NXDL to YAML converter + +**NOTE: Please use python3.8 or above to run this converter** + +**Tools purpose**: Offer a simple YAML-based schema and a XML-based schema to describe NeXus instances. These can be NeXus application definitions or classes +such as base or contributed classes. Users either create NeXus instances by writing a YAML file or a XML file which details a hierarchy of data/metadata elements. +The forward (YAML -> NXDL.XML) and backward (NXDL.XML -> YAML) conversions are implemented. + +**How the tool works**: +- yaml2nxdl.py +1. Reads the user-specified NeXus instance, either in YML or XML format. +2. If input is in YAML, creates an instantiated NXDL schema XML tree by walking the dictionary nest. + If input is in XML, creates a YML file walking the dictionary nest. +3. Write the tree into a YAML file or a properly formatted NXDL XML schema file to disk. +4. Optionally, if --append argument is given, + the XML or YAML input file is interpreted as an extension of a base class and the entries contained in it + are appended below a standard NeXus base class. + You need to specify both your input file (with YAML or XML extension) and NeXus class (with no extension). + Both .yml and .nxdl.xml file of the extended class are printed. + +```console +user@box:~$ python yaml2nxdl.py + +Usage: python yaml2nxdl.py [OPTIONS] + +Options: + --input-file TEXT The path to the input data file to read. + --append TEXT Parse xml NeXus file and append to specified base class, + write the base class name with no extension. + --check-consistency Check consistency by generating another version of the input file. + E.g. for input file: NXexample.nxdl.xml the output file + NXexample_consistency.nxdl.xml. + --verbose Addictional std output info is printed to help debugging. + --help Show this message and exit. + +``` + +## Documentation + +**Rule set**: From transcoding YAML files we need to follow several rules. +* Named NeXus groups, which are instances of NeXus classes especially base or contributed classes. Creating (NXbeam) is a simple example of a request to define a group named according to NeXus default rules. mybeam1(NXbeam) or mybeam2(NXbeam) are examples how to create multiple named instances at the same hierarchy level. +* Members of groups so-called fields or attributes. A simple example of a member is voltage. Here the datatype is implied automatically as the default NeXus NX_CHAR type. By contrast, voltage(NX_FLOAT) can be used to instantiate a member of class which should be of NeXus type NX_FLOAT. +* And attributes of either groups or fields. Names of attributes have to be preceeded by \@ to mark them as attributes. +* Optionality: For all fields, groups and attributes in `application definitions` are `required` by default, except anything (`recommended` or `optional`) mentioned. + +**Special keywords**: Several keywords can be used as childs of groups, fields, and attributes to specify the members of these. Groups, fields and attributes are nodes of the XML tree. +* **doc**: A human-readable description/docstring +* **exists** Options are recommended, required, [min, 1, max, infty] numbers like here 1 can be replaced by any uint, or infty to indicate no restriction on how frequently the entry can occur inside the NXDL schema at the same hierarchy level. +* **link** Define links between nodes. +* **units** A statement introducing NeXus-compliant NXDL units arguments, like NX_VOLTAGE +* **dimensions** Details which dimensional arrays to expect +* **enumeration** Python list of strings which are considered as recommended entries to choose from. +* **dim_parameters** `dim` which is a child of `dimension` and the `dim` might have several attributes `ref`, +`incr` including `index` and `value`. So while writting `yaml` file schema definition please following structure: +``` +dimensions: + rank: integer value + dim: [[ind_1, val_1], [ind_2, val_2], ...] + dim_parameters: + ref: [ref_value_1, ref_value_2, ...] + incr: [incr_value_1, incr_value_2, ...] +``` +Keep in mind that length of all the lists must be same. + +## Next steps + +The NOMAD team is currently working on the establishing of a one-to-one mapping between +NeXus definitions and the NOMAD MetaInfo. As soon as this is in place the YAML files will +be annotated with further metadata so that they can serve two purposes. +On the one hand they can serve as an instance for a schema to create a GUI representation +of a NOMAD Oasis ELN schema. On the other hand the YAML to NXDL converter will skip all +those pieces of information which are irrelevant from a NeXus perspective. diff --git a/dev_tools/nyaml2nxdl/__init__.py b/dev_tools/nyaml2nxdl/__init__.py new file mode 100644 index 000000000..22eb35f68 --- /dev/null +++ b/dev_tools/nyaml2nxdl/__init__.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +""" +# Load paths +""" +# -*- coding: utf-8 -*- +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/dev_tools/nyaml2nxdl/comment_collector.py b/dev_tools/nyaml2nxdl/comment_collector.py new file mode 100644 index 000000000..0041c14ec --- /dev/null +++ b/dev_tools/nyaml2nxdl/comment_collector.py @@ -0,0 +1,519 @@ +#!usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Collect comments in a list by CommentCollector class. Comment is a instance of Comment, +where each comment includes comment text and line info or neighbour info where the +comment must be assinged. + +The class Comment is an abstract class for general functions or method to be implemented +XMLComment and YAMLComment class. + +NOTE: Here comment block mainly stands for (comment text + line or element for what comment is +intended.) +""" + + +from typing import Any +from typing import Dict +from typing import List +from typing import Tuple +from typing import Type +from typing import Union + +from .nyaml2nxdl_helper import LineLoader + +__all__ = ["Comment", "CommentCollector", "XMLComment", "YAMLComment"] + + +# pylint: disable=inconsistent-return-statements +class CommentCollector: + """CommentCollector will store a full comment ('Comment') object in + _comment_chain. + """ + + def __init__(self, input_file: str = None, loaded_obj: Union[object, Dict] = None): + """ + Initialise CommentCollector + parameters: + input_file: raw input file (xml, yml) + loaded_obj: file loaded by third party library + """ + self._comment_chain: List = [] + self.file = input_file + self._comment_tracker = 0 + self._comment_hash: Dict[Tuple, Type[Comment]] = {} + self.comment: Type[Comment] + if self.file and not loaded_obj: + if self.file.split(".")[-1] == "xml": + self.comment = XMLComment + if self.file.split(".")[-1] == "yaml": + self.comment = YAMLComment + with open(self.file, "r", encoding="utf-8") as plain_text_yaml: + loader = LineLoader(plain_text_yaml) + self.comment.__yaml_dict__ = loader.get_single_data() + elif self.file and loaded_obj: + if self.file.split(".")[-1] == "yaml" and isinstance(loaded_obj, dict): + self.comment = YAMLComment + self.comment.__yaml_dict__ = loaded_obj + else: + raise ValueError( + "Incorrect inputs for CommentCollector e.g. Wrong file extension." + ) + + else: + raise ValueError("Incorrect inputs for CommentCollector") + + def extract_all_comment_blocks(self): + """ + Collect all comments. Note that here comment means (comment text + element or line info + intended for comment. + """ + id_ = 0 + single_comment = self.comment(comment_id=id_) + with open(self.file, mode="r", encoding="UTF-8") as enc_f: + lines = enc_f.readlines() + # Make an empty line for last comment if no empty lines in original file + if lines[-1] != "": + lines.append("") + for line_num, line in enumerate(lines): + if single_comment.is_storing_single_comment(): + # If the last comment comes without post nxdl fields, groups and attributes + if "++ SHA HASH ++" in line: + # Handle with stored nxdl.xml file that is not part of yaml + line = "" + single_comment.process_each_line( + line + "post_comment", (line_num + 1) + ) + self._comment_chain.append(single_comment) + break + if line_num < (len(lines) - 1): + # Processing file from Line number 1 + single_comment.process_each_line(line, (line_num + 1)) + else: + # For processing last line of file + single_comment.process_each_line( + line + "post_comment", (line_num + 1) + ) + self._comment_chain.append(single_comment) + else: + self._comment_chain.append(single_comment) + single_comment = self.comment(last_comment=single_comment) + single_comment.process_each_line(line, (line_num + 1)) + + def get_comment(self): + """ + Return comment from comment_chain that must come earlier in order. + """ + return self._comment_chain[self._comment_tracker] + + def get_coment_by_line_info(self, comment_locs: Tuple[str, Union[int, str]]): + """ + Get comment using line information. + """ + if comment_locs in self._comment_hash: + return self._comment_hash[comment_locs] + + line_annot, line_loc = comment_locs + for cmnt in self._comment_chain: + if line_annot in cmnt: + line_loc_ = cmnt.get_line_number(line_annot) + if line_loc == line_loc_: + self._comment_hash[comment_locs] = cmnt + return cmnt + + def remove_comment(self, ind): + """Remove a comment from comment list.""" + if ind < len(self._comment_chain): + del self._comment_chain[ind] + else: + raise ValueError("Oops! Index is out of range.") + + def reload_comment(self): + """ + Update self._comment_tracker after done with last comment. + """ + self._comment_tracker += 1 + + def __contains__(self, comment_locs: tuple): + """ + Confirm wether the comment corresponds to key_line and line_loc + is exist or not. + comment_locs is equvalant to (line_annotation, line_loc) e.g. + (__line__doc and 35) + """ + if not isinstance(comment_locs, tuple): + raise TypeError( + "Comment_locs should be 'tuple' containing line annotation " + "(e.g.__line__doc) and line_loc (e.g. 35)." + ) + line_annot, line_loc = comment_locs + for cmnt in self._comment_chain: + if line_annot in cmnt: + line_loc_ = cmnt.get_line_number(line_annot) + if line_loc == line_loc_: + self._comment_hash[comment_locs] = cmnt + return True + return False + + def __getitem__(self, ind): + """Get comment from self.obj._comment_chain by index.""" + if isinstance(ind, int): + if ind >= len(self._comment_chain): + raise IndexError( + f"Oops! Comment index {ind} in {__class__} is out of range!" + ) + return self._comment_chain[ind] + + if isinstance(ind, slice): + start_n = ind.start or 0 + end_n = ind.stop or len(self._comment_chain) + return self._comment_chain[start_n:end_n] + + def __iter__(self): + """get comment ieratively""" + return iter(self._comment_chain) + + +# pylint: disable=too-many-instance-attributes +class Comment: + """ + This class is building yaml comment and the intended line for what comment is written. + """ + + def __init__(self, comment_id: int = -1, last_comment: "Comment" = None) -> None: + """Comment object can be considered as a block element that includes + document element (an entity for what the comment is written). + """ + self._elemt: Any = None + self._elemt_text: str = None + self._is_elemt_found: bool = None + self._is_elemt_stored: bool = None + + self._comnt: str = "" + # If Multiple comments for one element or entity + self._comnt_list: List[str] = [] + self.last_comment: "Comment" = last_comment if last_comment else None + if comment_id >= 0 and last_comment: + self.cid = comment_id + self.last_comment = last_comment + elif comment_id == 0 and not last_comment: + self.cid = comment_id + self.last_comment = None + elif last_comment: + self.cid = self.last_comment.cid + 1 + self.last_comment = last_comment + else: + raise ValueError("Neither last comment nor comment id dound") + self._comnt_start_found: bool = False + self._comnt_end_found: bool = False + self.is_storing_single_comment = lambda: not ( + self._comnt_end_found and self._is_elemt_stored + ) + + def get_comment_text(self) -> Union[List, str]: + """ + Extract comment text from entrire comment (comment text + elment or + line for what comment is intended) + """ + + def append_comment(self, text: str) -> None: + """ + Append lines of the same comment. + """ + + def store_element(self, args) -> None: + """ + Strore comment text and line or element that is intended for comment. + """ + + +class XMLComment(Comment): + """ + XMLComment to store xml comment element. + """ + + def __init__(self, comment_id: int = -1, last_comment: "Comment" = None) -> None: + super().__init__(comment_id, last_comment) + + def process_each_line(self, text, line_num): + """Take care of each line of text. Through which function the text + must be passed should be decide here. + """ + text = text.strip() + if text and line_num: + self.append_comment(text) + if self._comnt_end_found and not self._is_elemt_found: + # for multiple comment if exist + if self._comnt: + self._comnt_list.append(self._comnt) + self._comnt = "" + + if self._comnt_end_found: + self.store_element(text) + + def append_comment(self, text: str) -> None: + # Comment in single line + if "" == text[-4:]: + self._comnt_end_found = True + self._comnt_start_found = False + self._comnt = self._comnt.replace("-->", "") + + elif "-->" == text[0:4] and self._comnt_start_found: + self._comnt_end_found = True + self._comnt_start_found = False + self._comnt = self._comnt + "\n" + text.replace("-->", "") + elif self._comnt_start_found: + self._comnt = self._comnt + "\n" + text + + # pylint: disable=arguments-differ, arguments-renamed + def store_element(self, text) -> None: + def collect_xml_attributes(text_part): + for part in text_part: + part = part.strip() + if part and '">' == "".join(part[-2:]): + self._is_elemt_stored = True + self._is_elemt_found = False + part = "".join(part[0:-2]) + elif part and '"/>' == "".join(part[-3:]): + self._is_elemt_stored = True + self._is_elemt_found = False + part = "".join(part[0:-3]) + elif part and "/>" == "".join(part[-2:]): + self._is_elemt_stored = True + self._is_elemt_found = False + part = "".join(part[0:-2]) + elif part and ">" == part[-1]: + self._is_elemt_stored = True + self._is_elemt_found = False + part = "".join(part[0:-1]) + elif part and '"' == part[-1]: + part = "".join(part[0:-1]) + + if '="' in part: + lf_prt, rt_prt = part.split('="') + else: + continue + if ":" in lf_prt: + continue + self._elemt[lf_prt] = str(rt_prt) + + if not self._elemt: + self._elemt = {} + # First check for comment part has been collected prefectly + if " Union[List, str]: + """ + This method returns list of commnent text. As some xml element might have + multiple separated comment intended for a single element. + """ + return self._comnt_list + + +class YAMLComment(Comment): + """ + This class for stroing comment text as well as location of the comment e.g. line + number of other in the file. + NOTE: + 1. Do not delete any element form yaml dictionary (for loaded_obj. check: Comment_collector + class. because this loaded file has been exploited in nyaml2nxdl forward tools.) + """ + + # Class level variable. The main reason behind that to follow structure of + # abstract class 'Comment' + __yaml_dict__: dict = {} + __yaml_line_info: dict = {} + __comment_escape_char = {"--": "-\\-"} + + def __init__(self, comment_id: int = -1, last_comment: "Comment" = None) -> None: + """Initialization of YAMLComment follow Comment class.""" + super().__init__(comment_id, last_comment) + self.collect_yaml_line_info( + YAMLComment.__yaml_dict__, YAMLComment.__yaml_line_info + ) + + def process_each_line(self, text, line_num): + """Take care of each line of text. Through which function the text + must be passed should be decide here. + """ + text = text.strip() + self.append_comment(text) + if self._comnt_end_found and not self._is_elemt_found: + if self._comnt: + self._comnt_list.append(self._comnt) + self._comnt = "" + + if self._comnt_end_found: + line_key = "" + if ":" in text: + ind = text.index(":") + line_key = "__line__" + "".join(text[0:ind]) + + for l_num, l_key in self.__yaml_line_info.items(): + if line_num == int(l_num) and line_key == l_key: + self.store_element(line_key, line_num) + break + # Comment comes very end of the file + if text == "post_comment" and line_key == "": + line_key = "__line__post_comment" + self.store_element(line_key, line_num) + + def has_post_comment(self): + """ + Ensure is this a post coment or not. + Post comment means the comment that come at the very end without having any + nxdl element(class, group, filed and attribute.) + """ + for key, _ in self._elemt.items(): + if "__line__post_comment" == key: + return True + return False + + def append_comment(self, text: str) -> None: + """ + Collects all the line of the same comment and + append them with that single comment. + """ + # check for escape char + text = self.replace_scape_char(text) + # Empty line after last line of comment + if not text and self._comnt_start_found: + self._comnt_end_found = True + self._comnt_start_found = False + # For empty line inside doc or yaml file. + elif not text: + return + elif "# " == "".join(text[0:2]): + self._comnt_start_found = True + self._comnt_end_found = False + self._comnt = "" if not self._comnt else self._comnt + "\n" + self._comnt = self._comnt + "".join(text[2:]) + elif "#" == text[0]: + self._comnt_start_found = True + self._comnt_end_found = False + self._comnt = "" if not self._comnt else self._comnt + "\n" + self._comnt = self._comnt + "".join(text[1:]) + elif "post_comment" == text: + self._comnt_end_found = True + self._comnt_start_found = False + # for any line after 'comment block' found + elif self._comnt_start_found: + self._comnt_start_found = False + self._comnt_end_found = True + + # pylint: disable=arguments-differ + def store_element(self, line_key, line_number): + """ + Store comment content and information of commen location (for what comment is + created.). + """ + self._elemt = {} + self._elemt[line_key] = int(line_number) + self._is_elemt_found = False + self._is_elemt_stored = True + + def get_comment_text(self): + """ + Return list of comments if there are multiple comment for same yaml line. + """ + return self._comnt_list + + def get_line_number(self, line_key): + """ + Retrun line number for what line the comment is created + """ + return self._elemt[line_key] + + def get_line_info(self): + """ + Return line annotation and line number from a comment. + """ + for line_anno, line_loc in self._elemt.items(): + return line_anno, line_loc + + def replace_scape_char(self, text): + """Replace escape char according to __comment_escape_char dict""" + for ecp_char, ecp_alt in YAMLComment.__comment_escape_char.items(): + if ecp_char in text: + text = text.replace(ecp_char, ecp_alt) + return text + + def get_element_location(self): + """ + Retrun yaml line '__line__KEY' info and and line numner + """ + if len(self._elemt) > 1: + raise ValueError(f"Comment element should be one but got " f"{self._elemt}") + + for key, val in self._elemt.items(): + yield key, val + + def collect_yaml_line_info(self, yaml_dict, line_info_dict): + """Collect __line__key and corresponding value from + a yaml file dictonary in another dictionary. + """ + for line_key, line_n in yaml_dict.items(): + if "__line__" in line_key: + line_info_dict[line_n] = line_key + + for _, val in yaml_dict.items(): + if isinstance(val, dict): + self.collect_yaml_line_info(val, line_info_dict) + + def __contains__(self, line_key): + """For Checking whether __line__NAME is in _elemt dict or not.""" + return line_key in self._elemt + + def __eq__(self, comment_obj): + """Check the self has same value as right comment.""" + if len(self._comnt_list) != len(comment_obj._comnt_list): + return False + for left_cmnt, right_cmnt in zip(self._comnt_list, comment_obj._comnt_list): + left_cmnt = left_cmnt.split("\n") + right_cmnt = right_cmnt.split("\n") + for left_line, right_line in zip(left_cmnt, right_cmnt): + if left_line.strip() != right_line.strip(): + return False + return True diff --git a/dev_tools/nyaml2nxdl/nyaml2nxdl.py b/dev_tools/nyaml2nxdl/nyaml2nxdl.py new file mode 100755 index 000000000..dccfff6e4 --- /dev/null +++ b/dev_tools/nyaml2nxdl/nyaml2nxdl.py @@ -0,0 +1,253 @@ +#!/usr/bin/env python3 +"""Main file of yaml2nxdl tool. +Users create NeXus instances by writing a YAML file +which details a hierarchy of data/metadata elements + +""" +# -*- coding: utf-8 -*- +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import xml.etree.ElementTree as ET + +import click + +from .nyaml2nxdl_backward_tools import Nxdl2yaml +from .nyaml2nxdl_backward_tools import compare_niac_and_my +from .nyaml2nxdl_forward_tools import nyaml2nxdl +from .nyaml2nxdl_forward_tools import pretty_print_xml +from .nyaml2nxdl_helper import extend_yamlfile_with_comment +from .nyaml2nxdl_helper import get_sha256_hash +from .nyaml2nxdl_helper import separate_hash_yaml_and_nxdl + +DEPTH_SIZE = 4 * " " + +# NOTE: Some handful links for nyaml2nxdl converter: +# https://manual.nexusformat.org/nxdl_desc.html?highlight=optional + + +def generate_nxdl_or_retrieve_nxdl(yaml_file, out_xml_file, verbose): + """ + Generate yaml, nxdl and hash. + if the extracted hash is exactly the same as producd from generated yaml then + retrieve the nxdl part from provided yaml. + Else, generate nxdl from separated yaml with the help of nyaml2nxdl function + """ + pa_path, rel_file = os.path.split(yaml_file) + sep_yaml = os.path.join(pa_path, f"temp_{rel_file}") + hash_found = separate_hash_yaml_and_nxdl(yaml_file, sep_yaml, out_xml_file) + + if hash_found: + gen_hash = get_sha256_hash(sep_yaml) + if hash_found == gen_hash: + os.remove(sep_yaml) + return + + nyaml2nxdl(sep_yaml, out_xml_file, verbose) + os.remove(sep_yaml) + + +# pylint: disable=too-many-locals +def append_yml(input_file, append, verbose): + """Append to an existing NeXus base class new elements provided in YML input file \ +and print both an XML and YML file of the extended base class. + +""" + nexus_def_path = os.path.join( + os.path.abspath(os.path.dirname(__file__)), "../../definitions" + ) + assert [ + s + for s in os.listdir(os.path.join(nexus_def_path, "base_classes")) + if append.strip() == s.replace(".nxdl.xml", "") + ], "Your base class extension does not match any existing NeXus base classes" + tree = ET.parse( + os.path.join(nexus_def_path + "/base_classes", append + ".nxdl.xml") + ) + root = tree.getroot() + # warning: tmp files are printed on disk and removed at the ends!! + pretty_print_xml(root, "tmp.nxdl.xml") + input_tmp_xml = "tmp.nxdl.xml" + out_tmp_yml = "tmp_parsed.yaml" + converter = Nxdl2yaml([], []) + converter.print_yml(input_tmp_xml, out_tmp_yml, verbose) + nyaml2nxdl(input_file=out_tmp_yml, out_file="tmp_parsed.nxdl.xml", verbose=verbose) + tree = ET.parse("tmp_parsed.nxdl.xml") + tree2 = ET.parse(input_file) + root_no_duplicates = ET.Element( + "definition", + { + "xmlns": "http://definition.nexusformat.org/nxdl/3.1", + "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", + "xsi:schemaLocation": "http://www.w3.org/2001/XMLSchema-instance", + }, + ) + for attribute_keys in root.attrib.keys(): + if ( + attribute_keys + != "{http://www.w3.org/2001/XMLSchema-instance}schemaLocation" + ): + attribute_value = root.attrib[attribute_keys] + root_no_duplicates.set(attribute_keys, attribute_value) + for elems in root.iter(): + if "doc" in elems.tag: + root_doc = ET.SubElement(root_no_duplicates, "doc") + root_doc.text = elems.text + break + group = "{http://definition.nexusformat.org/nxdl/3.1}group" + root_no_duplicates = compare_niac_and_my( + tree, tree2, verbose, group, root_no_duplicates + ) + field = "{http://definition.nexusformat.org/nxdl/3.1}field" + root_no_duplicates = compare_niac_and_my( + tree, tree2, verbose, field, root_no_duplicates + ) + attribute = "{http://definition.nexusformat.org/nxdl/3.1}attribute" + root_no_duplicates = compare_niac_and_my( + tree, tree2, verbose, attribute, root_no_duplicates + ) + pretty_print_xml( + root_no_duplicates, + f"{input_file.replace('.nxdl.xml', '')}" f"_appended.nxdl.xml", + ) + + input_file_xml = input_file.replace(".nxdl.xml", "_appended.nxdl.xml") + out_file_yml = input_file.replace(".nxdl.xml", "_appended_parsed.yaml") + converter = Nxdl2yaml([], []) + converter.print_yml(input_file_xml, out_file_yml, verbose) + nyaml2nxdl( + input_file=out_file_yml, + out_file=out_file_yml.replace(".yaml", ".nxdl.xml"), + verbose=verbose, + ) + os.rename( + f"{input_file.replace('.nxdl.xml', '_appended_parsed.yaml')}", + f"{input_file.replace('.nxdl.xml', '_appended.yaml')}", + ) + os.rename( + f"{input_file.replace('.nxdl.xml', '_appended_parsed.nxdl.xml')}", + f"{input_file.replace('.nxdl.xml', '_appended.nxdl.xml')}", + ) + os.remove("tmp.nxdl.xml") + os.remove("tmp_parsed.yaml") + os.remove("tmp_parsed.nxdl.xml") + + +def split_name_and_extension(file_name): + """ + Split file name into extension and rest of the file name. + return file raw nam and extension + """ + path = file_name.rsplit("/", 1) + (pathn, filen) = ["", path[0]] if len(path) == 1 else [path[0] + "/", path[1]] + parts = filen.rsplit(".", 2) + raw = ext = "" + if len(parts) == 2: + raw = parts[0] + ext = parts[1] + elif len(parts) == 3: + raw = parts[0] + ext = ".".join(parts[1:]) + + return pathn + raw, ext + + +@click.command() +@click.option( + "--input-file", + required=True, + prompt=True, + help="The path to the XML or YAML input data file to read and create \ +a YAML or XML file from, respectively.", +) +@click.option( + "--append", + help="Parse xml file and append to base class, given that the xml file has same name \ +of an existing base class", +) +@click.option( + "--check-consistency", + is_flag=True, + default=False, + help=( + "Check wether yaml or nxdl has followed general rules of scema or not" + "check whether your comment in the right place or not. The option render an " + "output file of the same extension(*_consistency.yaml or *_consistency.nxdl.xml)" + ), +) +@click.option( + "--verbose", + is_flag=True, + default=False, + help="Print in standard output keywords and value types to help \ +possible issues in yaml files", +) +def launch_tool(input_file, verbose, append, check_consistency): + """ + Main function that distiguishes the input file format and launches the tools. + """ + if os.path.isfile(input_file): + raw_name, ext = split_name_and_extension(input_file) + else: + raise ValueError("Need a valid input file.") + + if ext == "yaml": + xml_out_file = raw_name + ".nxdl.xml" + generate_nxdl_or_retrieve_nxdl(input_file, xml_out_file, verbose) + if append: + append_yml(raw_name + ".nxdl.xml", append, verbose) + # For consistency running + if check_consistency: + yaml_out_file = raw_name + "_consistency." + ext + converter = Nxdl2yaml([], []) + converter.print_yml(xml_out_file, yaml_out_file, verbose) + os.remove(xml_out_file) + elif ext == "nxdl.xml": + if not append: + yaml_out_file = raw_name + "_parsed" + ".yaml" + converter = Nxdl2yaml([], []) + converter.print_yml(input_file, yaml_out_file, verbose) + # Append nxdl.xml file with yaml output file + yaml_hash = get_sha256_hash(yaml_out_file) + # Lines as divider between yaml and nxdl + top_lines = [ + ( + "\n# ++++++++++++++++++++++++++++++++++ SHA HASH" + " ++++++++++++++++++++++++++++++++++\n" + ), + f"# {yaml_hash}\n", + ] + + extend_yamlfile_with_comment( + yaml_file=yaml_out_file, + file_to_be_appended=input_file, + top_lines_list=top_lines, + ) + else: + append_yml(input_file, append, verbose) + # Taking care of consistency running + if check_consistency: + xml_out_file = raw_name + "_consistency." + ext + generate_nxdl_or_retrieve_nxdl(yaml_out_file, xml_out_file, verbose) + os.remove(yaml_out_file) + else: + raise ValueError("Provide correct file with extension '.yaml or '.nxdl.xml") + + +if __name__ == "__main__": + launch_tool().parse() # pylint: disable=no-value-for-parameter diff --git a/dev_tools/nyaml2nxdl/nyaml2nxdl_backward_tools.py b/dev_tools/nyaml2nxdl/nyaml2nxdl_backward_tools.py new file mode 100755 index 000000000..dcf56b998 --- /dev/null +++ b/dev_tools/nyaml2nxdl/nyaml2nxdl_backward_tools.py @@ -0,0 +1,1046 @@ +#!/usr/bin/env python3 +"""This file collects the function used in the reverse tool nxdl2yaml. + +""" +import os + +# -*- coding: utf-8 -*- +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import sys +import xml.etree.ElementTree as ET +from typing import Dict +from typing import List + +from .nyaml2nxdl_helper import cleaning_empty_lines +from .nyaml2nxdl_helper import get_node_parent_info +from .nyaml2nxdl_helper import get_yaml_escape_char_dict +from .nyaml2nxdl_helper import remove_namespace_from_tag + +DEPTH_SIZE = " " +CMNT_TAG = "!--" + + +def separate_pi_comments(input_file): + """ + Separate PI comments from ProcessesInstruction (pi) + """ + comments_list = [] + comment = [] + xml_lines = [] + + with open(input_file, "r", encoding="utf-8") as file: + lines = file.readlines() + has_pi = True + for line in lines: + c_start = "" + def_tag = " 0 and has_pi: + comment.append(line.replace(cmnt_end, "")) + comments_list.append("".join(comment)) + comment = [] + elif def_tag in line or not has_pi: + has_pi = False + xml_lines.append(line) + elif len(comment) > 0 and has_pi: + comment.append(line) + else: + xml_lines.append(line) + return comments_list, "".join(xml_lines) + + +# Collected: https://dustinoprea.com/2019/01/22/python-parsing-xml-and-retaining-the-comments/ +class _CommentedTreeBuilder(ET.TreeBuilder): + def comment(self, text): + """ + defining comment builder in TreeBuilder + """ + self.start("!--", {}) + self.data(text) + self.end("--") + + +def parse(filepath): + """ + Construct parse function for modified tree builder for including modified TreeBuilder + and rebuilding XMLParser. + """ + comments, xml_str = separate_pi_comments(filepath) + ctb = _CommentedTreeBuilder() + xp_parser = ET.XMLParser(target=ctb) + root = ET.fromstring(xml_str, parser=xp_parser) + return comments, root + + +def handle_mapping_char(text, depth=-1, skip_n_line_on_top=False): + """Check for ":" char and replace it by "':'".""" + + escape_char = get_yaml_escape_char_dict() + for esc_key, val in escape_char.items(): + if esc_key in text: + text = text.replace(esc_key, val) + if not skip_n_line_on_top: + if depth > 0: + text = add_new_line_with_pipe_on_top(text, depth) + else: + raise ValueError("Need depth size to co-ordinate text line in yaml file.") + return text + + +def add_new_line_with_pipe_on_top(text, depth): + """ + Return modified text for what we get error in converter, such as ':'. After adding a + new line at the start of text the error is solved. + """ + char_list_to_add_new_line_on_top_of_text = [":"] + for char in char_list_to_add_new_line_on_top_of_text: + if char in text: + return "|" + "\n" + depth * DEPTH_SIZE + text + return text + + +# pylint: disable=too-many-instance-attributes +class Nxdl2yaml: + """ + Parse XML file and print a YML file + """ + + def __init__( + self, + symbol_list: List[str], + root_level_definition: List[str], + root_level_doc="", + root_level_symbols="", + ): + # updated part of yaml_dict + self.found_definition = False + self.root_level_doc = root_level_doc + self.root_level_symbols = root_level_symbols + self.root_level_definition = root_level_definition + self.symbol_list = symbol_list + self.is_last_element_comment = False + self.include_comment = True + self.pi_comments = None + # NOTE: Here is how root_level_comments organised for storing comments + # root_level_comment= {'root_doc': comment, + # 'symbols': comment, + # The 'symbol_doc_comments' list is for comments from all 'symbol doc' + # 'symbol_doc_comments' : [comments] + # 'symbol_list': [symbols], + # The 'symbol_comments' contains comments for 'symbols doc' and all 'symbol' + # 'symbol_comments': [comments]} + self.root_level_comment: Dict[str, str] = {} + + def print_yml(self, input_file, output_yml, verbose): + """ + Parse an XML file provided as input and print a YML file + """ + if os.path.isfile(output_yml): + os.remove(output_yml) + + depth = 0 + + self.pi_comments, root = parse(input_file) + xml_tree = {"tree": root, "node": root} + self.xmlparse(output_yml, xml_tree, depth, verbose) + + def handle_symbols(self, depth, node): + """Handle symbols field and its childs symbol""" + + # pylint: disable=consider-using-f-string + self.root_level_symbols = ( + f"{remove_namespace_from_tag(node.tag)}: " + f"{node.text.strip() if node.text else ''}" + ) + depth += 1 + last_comment = "" + sbl_doc_cmnt_list = [] + # Comments that come above symbol tag + symbol_cmnt_list = [] + for child in list(node): + tag = remove_namespace_from_tag(child.tag) + if tag == CMNT_TAG and self.include_comment: + last_comment = self.comvert_to_ymal_comment( + depth * DEPTH_SIZE, child.text + ) + if tag == "doc": + symbol_cmnt_list.append(last_comment) + # The bellow line is for handling lenth of 'symbol_comments' and + # 'symbol_doc_comments'. Otherwise print_root_level_info() gets inconsistency + # over for the loop while writting comment on file + sbl_doc_cmnt_list.append("") + last_comment = "" + self.symbol_list.append( + self.handle_not_root_level_doc(depth, text=child.text) + ) + elif tag == "symbol": + # place holder is symbol name + symbol_cmnt_list.append(last_comment) + last_comment = "" + if "doc" in child.attrib: + self.symbol_list.append( + self.handle_not_root_level_doc( + depth, tag=child.attrib["name"], text=child.attrib["doc"] + ) + ) + else: + for symbol_doc in list(child): + tag = remove_namespace_from_tag(symbol_doc.tag) + if tag == CMNT_TAG and self.include_comment: + last_comment = self.comvert_to_ymal_comment( + depth * DEPTH_SIZE, symbol_doc.text + ) + if tag == "doc": + sbl_doc_cmnt_list.append(last_comment) + last_comment = "" + self.symbol_list.append( + self.handle_not_root_level_doc( + depth, + tag=child.attrib["name"], + text=symbol_doc.text, + ) + ) + self.store_root_level_comments("symbol_doc_comments", sbl_doc_cmnt_list) + self.store_root_level_comments("symbol_comments", symbol_cmnt_list) + + def store_root_level_comments(self, holder, comment): + """Store yaml text or section line and the comments inteded for that lines or section""" + + self.root_level_comment[holder] = comment + + def handle_definition(self, node): + """ + Handle definition group and its attributes + NOTE: Here we tried to store the order of the xml element attributes. So that we get + exactly the same file in nxdl from yaml. + """ + # pylint: disable=consider-using-f-string + # self.root_level_definition[0] = '' + keyword = "" + # tmp_word for reseving the location + tmp_word = "#xx#" + attribs = node.attrib + # for tracking the order of name and type + keyword_order = -1 + for item in attribs: + if "name" in item: + keyword = keyword + attribs[item] + if keyword_order == -1: + self.root_level_definition.append(tmp_word) + keyword_order = self.root_level_definition.index(tmp_word) + elif "extends" in item: + keyword = f"{keyword}({attribs[item]})" + if keyword_order == -1: + self.root_level_definition.append(tmp_word) + keyword_order = self.root_level_definition.index(tmp_word) + elif "schemaLocation" not in item and "extends" != item: + text = f"{item}: {attribs[item]}" + self.root_level_definition.append(text) + self.root_level_definition[keyword_order] = f"{keyword}:" + + def handle_root_level_doc(self, node): + """ + Handle the documentation field found at root level. + """ + # tag = remove_namespace_from_tag(node.tag) + text = node.text + text = self.handle_not_root_level_doc(depth=0, text=text) + self.root_level_doc = text + + # pylint: disable=too-many-branches + def handle_not_root_level_doc(self, depth, text, tag="doc", file_out=None): + """ + Handle docs field along the yaml file. In this function we also tried to keep + the track of intended indentation. E.g. the bollow doc block. + * Topic name + Description of topic + """ + + # Handling empty doc + if not text: + text = "" + else: + text = handle_mapping_char(text, -1, True) + if "\n" in text: + # To remove '\n' character as it will be added before text. + text = cleaning_empty_lines(text.split("\n")) + text_tmp = [] + yaml_indent_n = len((depth + 1) * DEPTH_SIZE) + # Find indentaion in the first text line with alphabet + tmp_i = 0 + while tmp_i != -1: + first_line_indent_n = 0 + # Taking care of empty text whitout any character + if len(text) == 1 and text[0] == "": + break + for ch_ in text[tmp_i]: + if ch_ == " ": + first_line_indent_n = first_line_indent_n + 1 + elif ch_ != "": + tmp_i = -2 + break + tmp_i = tmp_i + 1 + # Taking care of doc like bellow: + # Text liness + # text continues + # So no indentaion at the staring or doc. So doc group will come along general + # alignment + if first_line_indent_n == 0: + first_line_indent_n = yaml_indent_n + + # for indent_diff -ve all lines will move left by the same ammout + # for indect_diff +ve all lines will move right the same amount + indent_diff = yaml_indent_n - first_line_indent_n + # CHeck for first line empty if not keep first line empty + + for _, line in enumerate(text): + line_indent_n = 0 + # Collect first empty space without alphabate + for ch_ in line: + if ch_ == " ": + line_indent_n = line_indent_n + 1 + else: + break + line_indent_n = line_indent_n + indent_diff + if line_indent_n < yaml_indent_n: + # if line still under yaml identation + text_tmp.append(yaml_indent_n * " " + line.strip()) + else: + text_tmp.append(line_indent_n * " " + line.strip()) + + text = "\n" + "\n".join(text_tmp) + if "}" in tag: + tag = remove_namespace_from_tag(tag) + indent = depth * DEPTH_SIZE + elif text: + text = "\n" + (depth + 1) * DEPTH_SIZE + text.strip() + if "}" in tag: + tag = remove_namespace_from_tag(tag) + indent = depth * DEPTH_SIZE + else: + text = "" + if "}" in tag: + tag = remove_namespace_from_tag(tag) + indent = depth * DEPTH_SIZE + + doc_str = f"{indent}{tag}: |{text}\n" + if file_out: + file_out.write(doc_str) + return None + return doc_str + + def write_out(self, indent, text, file_out): + """ + Write text line in output file. + """ + line_string = f"{indent}{text.rstrip()}\n" + file_out.write(line_string) + + def print_root_level_doc(self, file_out): + """ + Print at the root level of YML file \ + the general documentation field found in XML file + """ + indent = 0 * DEPTH_SIZE + + if ( + "root_doc" in self.root_level_comment + and self.root_level_comment["root_doc"] != "" + ): + text = self.root_level_comment["root_doc"] + self.write_out(indent, text, file_out) + + text = self.root_level_doc + self.write_out(indent, text, file_out) + self.root_level_doc = "" + + def comvert_to_ymal_comment(self, indent, text): + """ + Convert into yaml comment by adding exta '#' char in front of comment lines + """ + lines = text.split("\n") + mod_lines = [] + for line in lines: + line = line.strip() + if line and line[0] != "#": + line = indent + "# " + line + mod_lines.append(line) + elif line: + line = indent + line + mod_lines.append(line) + # The starting '\n' to keep multiple comments separate + return "\n" + "\n".join(mod_lines) + + def print_root_level_info(self, depth, file_out): + """ + Print at the root level of YML file \ + the information stored as definition attributes in the XML file + """ + # pylint: disable=consider-using-f-string + if depth < 0: + raise ValueError("Somthing wrong with indentaion in root level.") + + has_categoty = False + for def_line in self.root_level_definition: + if def_line in ("category: application", "category: base"): + self.write_out(indent=0 * DEPTH_SIZE, text=def_line, file_out=file_out) + # file_out.write(f"{def_line}\n") + has_categoty = True + + if not has_categoty: + raise ValueError( + "Definition dose not get any category from 'base or application'." + ) + self.print_root_level_doc(file_out) + if ( + "symbols" in self.root_level_comment + and self.root_level_comment["symbols"] != "" + ): + indent = depth * DEPTH_SIZE + text = self.root_level_comment["symbols"] + self.write_out(indent, text, file_out) + if self.root_level_symbols: + self.write_out( + indent=0 * DEPTH_SIZE, text=self.root_level_symbols, file_out=file_out + ) + # symbol_list include 'symbols doc', and all 'symbol' + for ind, symbol in enumerate(self.symbol_list): + # Taking care of comments that come on to of 'symbols doc' and 'symbol' + if ( + "symbol_comments" in self.root_level_comment + and self.root_level_comment["symbol_comments"][ind] != "" + ): + indent = depth * DEPTH_SIZE + self.write_out( + indent, + self.root_level_comment["symbol_comments"][ind], + file_out, + ) + if ( + "symbol_doc_comments" in self.root_level_comment + and self.root_level_comment["symbol_doc_comments"][ind] != "" + ): + indent = depth * DEPTH_SIZE + self.write_out( + indent, + self.root_level_comment["symbol_doc_comments"][ind], + file_out, + ) + + self.write_out(indent=(0 * DEPTH_SIZE), text=symbol, file_out=file_out) + if len(self.pi_comments) > 1: + indent = DEPTH_SIZE * depth + # The first comment is top level copy-right doc string + for comment in self.pi_comments[1:]: + self.write_out( + indent, self.comvert_to_ymal_comment(indent, comment), file_out + ) + if self.root_level_definition: + # Soring NXname for writting end of the definition attributes + nx_name = "" + for defs in self.root_level_definition: + if "NX" in defs and defs[-1] == ":": + nx_name = defs + continue + if defs in ("category: application", "category: base"): + continue + self.write_out(indent=0 * DEPTH_SIZE, text=defs, file_out=file_out) + self.write_out(indent=0 * DEPTH_SIZE, text=nx_name, file_out=file_out) + self.found_definition = False + + def handle_exists(self, exists_dict, key, val): + """ + Create exist component as folows: + + {'min' : value for min, + 'max' : value for max, + 'optional' : value for optional} + + This is created separately so that the keys stays in order. + """ + if not val: + val = "" + else: + val = str(val) + if "minOccurs" == key: + exists_dict["minOccurs"] = ["min", val] + if "maxOccurs" == key: + exists_dict["maxOccurs"] = ["max", val] + if "optional" == key: + exists_dict["optional"] = ["optional", val] + if "recommended" == key: + exists_dict["recommended"] = ["recommended", val] + if "required" == key: + exists_dict["required"] = ["required", val] + + # pylint: disable=too-many-branches, consider-using-f-string + def handle_group_or_field(self, depth, node, file_out): + """Handle all the possible attributes that come along a field or group""" + + allowed_attr = [ + "optional", + "recommended", + "name", + "type", + "axes", + "axis", + "data_offset", + "interpretation", + "long_name", + "maxOccurs", + "minOccurs", + "nameType", + "optional", + "primary", + "signal", + "stride", + "units", + "required", + "deprecated", + "exists", + ] + + name_type = "" + node_attr = node.attrib + rm_key_list = [] + # Maintain order: name and type in form name(type) or (type)name that come first + for key, val in node_attr.items(): + if key == "name": + name_type = name_type + val + rm_key_list.append(key) + if key == "type": + name_type = name_type + "(%s)" % val + rm_key_list.append(key) + if not name_type: + raise ValueError( + f"No 'name' or 'type' hase been found. But, 'group' or 'field' " + f"must have at list a nme.We got attributes: {node_attr}" + ) + file_out.write( + "{indent}{name_type}:\n".format( + indent=depth * DEPTH_SIZE, name_type=name_type + ) + ) + + for key in rm_key_list: + del node_attr[key] + + # tmp_dict intended to persevere order of attribnutes + tmp_dict = {} + exists_dict = {} + for key, val in node_attr.items(): + # As both 'minOccurs', 'maxOccurs' and optionality move to the 'exists' + if key in ["minOccurs", "maxOccurs", "optional", "recommended", "required"]: + if "exists" not in tmp_dict: + tmp_dict["exists"] = [] + self.handle_exists(exists_dict, key, val) + elif key == "units": + tmp_dict["unit"] = str(val) + else: + tmp_dict[key] = str(val) + if key not in allowed_attr: + raise ValueError( + f"An attribute ({key}) in 'field' or 'group' has been found " + f"that is not allowed. The allowed attr is {allowed_attr}." + ) + + if exists_dict: + for key, val in exists_dict.items(): + if key in ["minOccurs", "maxOccurs"]: + tmp_dict["exists"] = tmp_dict["exists"] + val + elif key in ["optional", "recommended", "required"]: + tmp_dict["exists"] = key + + depth_ = depth + 1 + for key, val in tmp_dict.items(): + # Increase depth size inside handle_map...() for writting text with one + # more indentation. + file_out.write( + f"{depth_ * DEPTH_SIZE}{key}: " + f"{handle_mapping_char(val, depth_ + 1, False)}\n" + ) + + # pylint: disable=too-many-branches, too-many-locals + def handle_dimension(self, depth, node, file_out): + """ + Handle the dimension field. + NOTE: Usually we take care of any xml element in xmlparse(...) and + recursion_in_xml_tree(...) functions. But Here it is a bit different. The doc dimension + and attributes of dim has been handled inside this function here. + """ + # pylint: disable=consider-using-f-string + possible_dim_attrs = ["ref", "required", "incr", "refindex"] + possible_dimemsion_attrs = ["rank"] + + # taking care of Dimension tag + file_out.write( + "{indent}{tag}:\n".format( + indent=depth * DEPTH_SIZE, tag=node.tag.split("}", 1)[1] + ) + ) + # Taking care of dimension attributes + for attr, value in node.attrib.items(): + if attr in possible_dimemsion_attrs and not isinstance(value, dict): + indent = (depth + 1) * DEPTH_SIZE + file_out.write(f"{indent}{attr}: {value}\n") + else: + raise ValueError( + f"Dimension has got an attribute {attr} that is not valid." + f"Current the allowd atributes are {possible_dimemsion_attrs}." + f" Please have a look" + ) + # taking carew of dimension doc + for child in list(node): + tag = remove_namespace_from_tag(child.tag) + if tag == "doc": + text = self.handle_not_root_level_doc(depth + 1, child.text) + file_out.write(text) + node.remove(child) + + dim_index_value = "" + dim_other_parts = {} + dim_cmnt_node = [] + # taking care of dim and doc childs of dimension + for child in list(node): + tag = remove_namespace_from_tag(child.tag) + child_attrs = child.attrib + # taking care of index and value attributes + if tag == ("dim"): + # taking care of index and value in format [[index, value]] + dim_index_value = dim_index_value + "[{index}, {value}], ".format( + index=child_attrs["index"] if "index" in child_attrs else "", + value=child_attrs["value"] if "value" in child_attrs else "", + ) + if "index" in child_attrs: + del child_attrs["index"] + if "value" in child_attrs: + del child_attrs["value"] + + # Taking care of doc comes as child of dim + for cchild in list(child): + ttag = cchild.tag.split("}", 1)[1] + if ttag == ("doc"): + if ttag not in dim_other_parts: + dim_other_parts[ttag] = [] + text = cchild.text + dim_other_parts[ttag].append(text.strip()) + child.remove(cchild) + continue + # taking care of other attributes except index and value + for attr, value in child_attrs.items(): + if attr in possible_dim_attrs: + if attr not in dim_other_parts: + dim_other_parts[attr] = [] + dim_other_parts[attr].append(value) + if tag == CMNT_TAG and self.include_comment: + # Store and remove node so that comment nodes from dim node so + # that it does not call in xmlparser function + dim_cmnt_node.append(child) + node.remove(child) + + # All 'dim' element comments on top of 'dim' yaml key + if dim_cmnt_node: + for ch_nd in dim_cmnt_node: + self.handel_comment(depth + 1, ch_nd, file_out) + # index and value attributes of dim elements + file_out.write( + "{indent}dim: [{value}]\n".format( + indent=(depth + 1) * DEPTH_SIZE, value=dim_index_value[:-2] or "" + ) + ) + # Write the attributes, except index and value, and doc of dim as child of dim_parameter. + # But tthe doc or attributes for each dim come inside list according to the order of dim. + if dim_other_parts: + file_out.write( + "{indent}dim_parameters:\n".format(indent=(depth + 1) * DEPTH_SIZE) + ) + # depth = depth + 2 dim_paramerter has child such as doc of dim + indent = (depth + 2) * DEPTH_SIZE + for key, value in dim_other_parts.items(): + if key == "doc": + value = self.handle_not_root_level_doc( + depth + 2, str(value), key, file_out + ) + else: + # Increase depth size inside handle_map...() for writting text with one + # more indentation. + file_out.write( + f"{indent}{key}: " + f"{handle_mapping_char(value, depth + 3, False)}\n" + ) + + def handle_enumeration(self, depth, node, file_out): + """ + Handle the enumeration field parsed from the xml file. + + If the enumeration items contain a doc field, the yaml file will contain items as child + fields of the enumeration field. + + If no doc are inherited in the enumeration items, a list of the items is given for the + enumeration list. + + """ + # pylint: disable=consider-using-f-string + + check_doc = [] + for child in list(node): + if list(child): + check_doc.append(list(child)) + # pylint: disable=too-many-nested-blocks + if check_doc: + file_out.write( + "{indent}{tag}: \n".format( + indent=depth * DEPTH_SIZE, tag=node.tag.split("}", 1)[1] + ) + ) + for child in list(node): + tag = remove_namespace_from_tag(child.tag) + itm_depth = depth + 1 + if tag == ("item"): + file_out.write( + "{indent}{value}: \n".format( + indent=(itm_depth) * DEPTH_SIZE, value=child.attrib["value"] + ) + ) + + if list(child): + for item_doc in list(child): + if remove_namespace_from_tag(item_doc.tag) == "doc": + item_doc_depth = itm_depth + 1 + self.handle_not_root_level_doc( + item_doc_depth, + item_doc.text, + item_doc.tag, + file_out, + ) + if ( + remove_namespace_from_tag(item_doc.tag) == CMNT_TAG + and self.include_comment + ): + self.handel_comment(itm_depth + 1, item_doc, file_out) + if tag == CMNT_TAG and self.include_comment: + self.handel_comment(itm_depth + 1, child, file_out) + else: + enum_list = "" + remove_nodes = [] + for item_child in list(node): + tag = remove_namespace_from_tag(item_child.tag) + if tag == ("item"): + enum_list = enum_list + "{value}, ".format( + value=item_child.attrib["value"] + ) + if tag == CMNT_TAG and self.include_comment: + self.handel_comment(depth, item_child, file_out) + remove_nodes.append(item_child) + for ch_node in remove_nodes: + node.remove(ch_node) + + file_out.write( + "{indent}{tag}: [{enum_list}]\n".format( + indent=depth * DEPTH_SIZE, + tag=remove_namespace_from_tag(node.tag), + enum_list=enum_list[:-2] or "", + ) + ) + + def handle_attributes(self, depth, node, file_out): + """Handle the attributes parsed from the xml file""" + + allowed_attr = [ + "name", + "type", + "units", + "nameType", + "recommended", + "optional", + "minOccurs", + "maxOccurs", + "deprecated", + ] + + name = "" + node_attr = node.attrib + if "name" in node_attr: + pass + else: + raise ValueError("Attribute must have an name key.") + rm_key_list = [] + # Maintain order: name and type in form name(type) or (type)name that come first + for key, val in node_attr.items(): + if key == "name": + name = val + rm_key_list.append(key) + + for key in rm_key_list: + del node_attr[key] + + file_out.write( + "{indent}{escapesymbol}{name}:\n".format( + indent=depth * DEPTH_SIZE, escapesymbol=r"\@", name=name + ) + ) + + tmp_dict = {} + exists_dict = {} + for key, val in node_attr.items(): + # As both 'minOccurs', 'maxOccurs' and optionality move to the 'exists' + if key in ["minOccurs", "maxOccurs", "optional", "recommended", "required"]: + if "exists" not in tmp_dict: + tmp_dict["exists"] = [] + self.handle_exists(exists_dict, key, val) + elif key == "units": + tmp_dict["unit"] = val + else: + tmp_dict[key] = val + if key not in allowed_attr: + raise ValueError( + f"An attribute ({key}) has been found that is not allowed." + f"The allowed attr is {allowed_attr}." + ) + + has_min_max = False + has_opt_reco_requ = False + if exists_dict: + for key, val in exists_dict.items(): + if key in ["minOccurs", "maxOccurs"]: + tmp_dict["exists"] = tmp_dict["exists"] + val + has_min_max = True + elif key in ["optional", "recommended", "required"]: + tmp_dict["exists"] = key + has_opt_reco_requ = True + if has_min_max and has_opt_reco_requ: + raise ValueError( + "Optionality 'exists' can take only either from ['minOccurs'," + " 'maxOccurs'] or from ['optional', 'recommended', 'required']" + ". But not from both of the groups together. Please check in" + " attributes" + ) + + depth_ = depth + 1 + for key, val in tmp_dict.items(): + # Increase depth size inside handle_map...() for writting text with one + # more indentation. + file_out.write( + f"{depth_ * DEPTH_SIZE}{key}: " + f"{handle_mapping_char(val, depth_ + 1, False)}\n" + ) + + def handel_link(self, depth, node, file_out): + """ + Handle link elements of nxdl + """ + + possible_link_attrs = ["name", "target", "napimount"] + node_attr = node.attrib + # Handle special cases + if "name" in node_attr: + file_out.write( + "{indent}{name}(link):\n".format( + indent=depth * DEPTH_SIZE, name=node_attr["name"] or "" + ) + ) + del node_attr["name"] + + depth_ = depth + 1 + # Handle general cases + for attr_key, val in node_attr.items(): + if attr_key in possible_link_attrs: + file_out.write( + "{indent}{attr}: {value}\n".format( + indent=depth_ * DEPTH_SIZE, attr=attr_key, value=val + ) + ) + else: + raise ValueError( + f"An anexpected attribute '{attr_key}' of link has found." + f"At this moment the alloed keys are {possible_link_attrs}" + ) + + def handel_choice(self, depth, node, file_out): + """ + Handle choice element which is a parent node of group. + """ + + possible_attr = [] + + node_attr = node.attrib + # Handle special casees + if "name" in node_attr: + file_out.write( + "{indent}{attr}(choice): \n".format( + indent=depth * DEPTH_SIZE, attr=node_attr["name"] + ) + ) + del node_attr["name"] + + depth_ = depth + 1 + # Taking care of general attrinutes. Though, still no attrinutes have found, + # but could be used for future + for attr in node_attr.items(): + if attr in possible_attr: + file_out.write( + "{indent}{attr}: {value}\n".format( + indent=depth_ * DEPTH_SIZE, attr=attr, value=node_attr[attr] + ) + ) + else: + raise ValueError( + f"An unexpected attribute '{attr}' of 'choice' has been found." + f"At this moment attributes for choice {possible_attr}" + ) + + def handel_comment(self, depth, node, file_out): + """ + Collect comment element and pass to write_out function + """ + indent = depth * DEPTH_SIZE + if self.is_last_element_comment: + text = self.comvert_to_ymal_comment(indent, node.text) + self.write_out(indent, text, file_out) + else: + text = self.comvert_to_ymal_comment(indent, node.text) + self.write_out(indent, text, file_out) + self.is_last_element_comment = True + + def recursion_in_xml_tree(self, depth, xml_tree, output_yml, verbose): + """ + Descend lower level in xml tree. If we are in the symbols branch, the recursive + behaviour is not triggered as we already handled the symbols' childs. + """ + + tree = xml_tree["tree"] + node = xml_tree["node"] + for child in list(node): + xml_tree_children = {"tree": tree, "node": child} + self.xmlparse(output_yml, xml_tree_children, depth, verbose) + + # pylint: disable=too-many-branches, too-many-statements + def xmlparse(self, output_yml, xml_tree, depth, verbose): + """ + Main of the nxdl2yaml converter. + It parses XML tree, then prints recursively each level of the tree + """ + tree = xml_tree["tree"] + node = xml_tree["node"] + if verbose: + sys.stdout.write(f"Node tag: {remove_namespace_from_tag(node.tag)}\n") + sys.stdout.write(f"Attributes: {node.attrib}\n") + with open(output_yml, "a", encoding="utf-8") as file_out: + tag = remove_namespace_from_tag(node.tag) + if tag == "definition": + self.found_definition = True + self.handle_definition(node) + # Taking care of root level doc and symbols + remove_cmnt_n = None + last_comment = "" + for child in list(node): + tag_tmp = remove_namespace_from_tag(child.tag) + if tag_tmp == CMNT_TAG and self.include_comment: + last_comment = self.comvert_to_ymal_comment( + depth * DEPTH_SIZE, child.text + ) + remove_cmnt_n = child + if tag_tmp == "doc": + self.store_root_level_comments("root_doc", last_comment) + last_comment = "" + self.handle_root_level_doc(child) + node.remove(child) + if remove_cmnt_n is not None: + node.remove(remove_cmnt_n) + remove_cmnt_n = None + if tag_tmp == "symbols": + self.store_root_level_comments("symbols", last_comment) + last_comment = "" + self.handle_symbols(depth, child) + node.remove(child) + if remove_cmnt_n is not None: + node.remove(remove_cmnt_n) + remove_cmnt_n = None + + if tag == ("doc") and depth != 1: + parent = get_node_parent_info(tree, node)[0] + doc_parent = remove_namespace_from_tag(parent.tag) + if doc_parent != "item": + self.handle_not_root_level_doc( + depth, text=node.text, tag=node.tag, file_out=file_out + ) + + if self.found_definition is True and self.root_level_doc: + self.print_root_level_info(depth, file_out) + # End of print root-level definitions in file + if tag in ("field", "group") and depth != 0: + self.handle_group_or_field(depth, node, file_out) + if tag == ("enumeration"): + self.handle_enumeration(depth, node, file_out) + if tag == ("attribute"): + self.handle_attributes(depth, node, file_out) + if tag == ("dimensions"): + self.handle_dimension(depth, node, file_out) + if tag == ("link"): + self.handel_link(depth, node, file_out) + if tag == ("choice"): + self.handel_choice(depth, node, file_out) + if tag == CMNT_TAG and self.include_comment: + self.handel_comment(depth, node, file_out) + depth += 1 + # Write nested nodes + self.recursion_in_xml_tree(depth, xml_tree, output_yml, verbose) + + +def compare_niac_and_my(tree, tree2, verbose, node, root_no_duplicates): + """This function creates two trees with Niac XML file and My XML file. + The main aim is to compare the two trees and create a new one that is the + union of the two initial trees. + """ + root = tree.getroot() + root2 = tree2.getroot() + attrs_list_niac = [] + for nodo in root.iter(node): + attrs_list_niac.append(nodo.attrib) + if verbose: + sys.stdout.write("Attributes found in Niac file: \n") + sys.stdout.write(str(attrs_list_niac) + "\n") + sys.stdout.write(" \n") + sys.stdout.write("Started merging of Niac and My file... \n") + for elem in root.iter(node): + if verbose: + sys.stdout.write("- Niac element inserted: \n") + sys.stdout.write(str(elem.attrib) + "\n") + index = get_node_parent_info(tree, elem)[1] + root_no_duplicates.insert(index, elem) + + for elem2 in root2.iter(node): + index = get_node_parent_info(tree2, elem2)[1] + if elem2.attrib not in attrs_list_niac: + if verbose: + sys.stdout.write("- My element inserted: \n") + sys.stdout.write(str(elem2.attrib) + "\n") + root_no_duplicates.insert(index, elem2) + + if verbose: + sys.stdout.write(" \n") + return root_no_duplicates diff --git a/dev_tools/nyaml2nxdl/nyaml2nxdl_forward_tools.py b/dev_tools/nyaml2nxdl/nyaml2nxdl_forward_tools.py new file mode 100644 index 000000000..664f68748 --- /dev/null +++ b/dev_tools/nyaml2nxdl/nyaml2nxdl_forward_tools.py @@ -0,0 +1,1242 @@ +#!/usr/bin/env python3 +"""Creates an instantiated NXDL schema XML tree by walking the dictionary nest + +""" +# -*- coding: utf-8 -*- +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import textwrap +import xml.etree.ElementTree as ET +from xml.dom import minidom + +import yaml + +from ..utils import nxdl_utils as pynxtools_nxlib +from .comment_collector import CommentCollector +from .nyaml2nxdl_helper import LineLoader +from .nyaml2nxdl_helper import cleaning_empty_lines +from .nyaml2nxdl_helper import get_yaml_escape_char_reverter_dict +from .nyaml2nxdl_helper import nx_name_type_resolving +from .nyaml2nxdl_helper import remove_namespace_from_tag + +# pylint: disable=too-many-lines, global-statement, invalid-name +DOM_COMMENT = ( + "\n" + "# NeXus - Neutron and X-ray Common Data Format\n" + "# \n" + "# Copyright (C) 2014-2022 NeXus International Advisory Committee (NIAC)\n" + "# \n" + "# This library is free software; you can redistribute it and/or\n" + "# modify it under the terms of the GNU Lesser General Public\n" + "# License as published by the Free Software Foundation; either\n" + "# version 3 of the License, or (at your option) any later version.\n" + "#\n" + "# This library is distributed in the hope that it will be useful,\n" + "# but WITHOUT ANY WARRANTY; without even the implied warranty of\n" + "# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU\n" + "# Lesser General Public License for more details.\n" + "#\n" + "# You should have received a copy of the GNU Lesser General Public\n" + "# License along with this library; if not, write to the Free Software\n" + "# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA\n" + "#\n" + "# For further information, see http://www.nexusformat.org\n" +) +NX_CLSS = pynxtools_nxlib.get_nx_classes() +NX_NEW_DEFINED_CLASSES = ["NX_COMPLEX"] +NX_TYPE_KEYS = pynxtools_nxlib.get_nx_attribute_type() +NX_ATTR_IDNT = "\\@" +NX_UNIT_IDNT = "unit" +DEPTH_SIZE = " " +NX_UNIT_TYPES = pynxtools_nxlib.get_nx_units() +COMMENT_BLOCKS: CommentCollector +CATEGORY = "" # Definition would be either 'base' or 'application' + + +def check_for_dom_comment_in_yaml(): + """Check the yaml file has dom comment or dom comment needed to be hard coded.""" + dignature_keyword_list = [ + "NeXus", + "GNU Lesser General Public", + "Free Software Foundation", + "Copyright (C)", + "WITHOUT ANY WARRANTY", + ] + + # Check for dom comments in first three comments + dom_comment = "" + dom_comment_ind = 1 + for ind, comnt in enumerate(COMMENT_BLOCKS[0:5]): + cmnt_list = comnt.get_comment_text() + if len(cmnt_list) == 1: + text = cmnt_list[0] + else: + continue + dom_comment = text + dom_comment_ind = ind + for keyword in dignature_keyword_list: + if keyword not in text: + dom_comment = "" + break + if dom_comment: + break + + # deactivate the root dom_comment, So that the corresponding comment would not be + # considered as comment for definition xml element. + if dom_comment: + COMMENT_BLOCKS.remove_comment(dom_comment_ind) + + return dom_comment + + +def yml_reader(inputfile): + """ + This function launches the LineLoader class. + It parses the yaml in a dict and extends it with line tag keys for each key of the dict. + """ + global COMMENT_BLOCKS + with open(inputfile, "r", encoding="utf-8") as plain_text_yaml: + loader = LineLoader(plain_text_yaml) + loaded_yaml = loader.get_single_data() + COMMENT_BLOCKS = CommentCollector(inputfile, loaded_yaml) + COMMENT_BLOCKS.extract_all_comment_blocks() + dom_cmnt_frm_yaml = check_for_dom_comment_in_yaml() + global DOM_COMMENT + if dom_cmnt_frm_yaml: + DOM_COMMENT = dom_cmnt_frm_yaml + + if "category" not in loaded_yaml.keys(): + raise ValueError( + "All definitions should be either 'base' or 'application' category. " + "No category has been found." + ) + global CATEGORY + CATEGORY = loaded_yaml["category"] + return loaded_yaml + + +def check_for_default_attribute_and_value(xml_element): + """NeXus Groups, fields and attributes might have xml default attributes and valuesthat must + come. For example: 'optional' which is 'true' by default for base class and false otherwise. + """ + + # base:Default attributes and value for all elements of base class except dimension element + base_attr_to_val = {"optional": "true"} + + # application: Default attributes and value for all elements of application class except + # dimension element + application_attr_to_val = {"optional": "false"} + + # Default attributes and value for dimension element + base_dim_attr_to_val = {"required": "false"} + application_dim_attr_to_val = {"required": "true"} + + # Eligible tag for default attr and value + elegible_tag = ["group", "field", "attribute"] + + def set_default_attribute(xml_elem, default_attr_to_val): + for deflt_attr, deflt_val in default_attr_to_val.items(): + if ( + deflt_attr not in xml_elem.attrib + and "maxOccurs" not in xml_elem.attrib + and "minOccurs" not in xml_elem.attrib + and "recommended" not in xml_elem.attrib + ): + xml_elem.set(deflt_attr, deflt_val) + + for child in list(xml_element): + # skiping comment 'function' that mainly collect comment from yaml file. + if not isinstance(child.tag, str): + continue + tag = remove_namespace_from_tag(child.tag) + + if tag == "dim" and CATEGORY == "base": + set_default_attribute(child, base_dim_attr_to_val) + if tag == "dim" and CATEGORY == "application": + set_default_attribute(child, application_dim_attr_to_val) + if tag in elegible_tag and CATEGORY == "base": + set_default_attribute(child, base_attr_to_val) + if tag in elegible_tag and CATEGORY == "application": + set_default_attribute(child, application_attr_to_val) + check_for_default_attribute_and_value(child) + + +def yml_reader_nolinetag(inputfile): + """ + pyyaml based parsing of yaml file in python dict + """ + with open(inputfile, "r", encoding="utf-8") as stream: + parsed_yaml = yaml.safe_load(stream) + return parsed_yaml + + +def check_for_skiped_attributes(component, value, allowed_attr=None, verbose=False): + """ + Check for any attributes have been skipped or not. + NOTE: We should keep in mind about 'doc' + """ + block_tag = ["enumeration"] + if value: + for attr, val in value.items(): + if attr in ["doc"]: + continue + if "__line__" in attr or attr in block_tag: + continue + line_number = f"__line__{attr}" + if verbose: + print(f"__line__ : {value[line_number]}") + if ( + not isinstance(val, dict) + and "\\@" not in attr + and attr not in allowed_attr + and "NX" not in attr + and val + ): + raise ValueError( + f"An attribute '{attr}' in part '{component}' has been found" + f". Please check arround line '{value[line_number]}. At this " + f"moment. The allowed attrbutes are {allowed_attr}" + ) + + +def format_nxdl_doc(string): + """NeXus format for doc string""" + string = check_for_mapping_char_other(string) + formatted_doc = "" + if "\n" not in string: + if len(string) > 80: + wrapped = textwrap.TextWrapper( + width=80, break_long_words=False, replace_whitespace=False + ) + string = "\n".join(wrapped.wrap(string)) + formatted_doc = "\n" + f"{string}" + else: + text_lines = string.split("\n") + text_lines = cleaning_empty_lines(text_lines) + formatted_doc += "\n" + "\n".join(text_lines) + if not formatted_doc.endswith("\n"): + formatted_doc += "\n" + return formatted_doc + + +def check_for_mapping_char_other(text): + """ + Check for mapping char \':\' which does not be passed through yaml library. + Then replace it by ':'. + """ + if not text: + text = "" + text = str(text) + if text == "True": + text = "true" + if text == "False": + text = "false" + # Some escape char is not valid in yaml libray which is written while writting + # yaml file. In the time of writting nxdl revert to that escape char. + escape_reverter = get_yaml_escape_char_reverter_dict() + for key, val in escape_reverter.items(): + if key in text: + text = text.replace(key, val) + return str(text).strip() + + +def xml_handle_doc(obj, value: str, line_number=None, line_loc=None): + """This function creates a 'doc' element instance, and appends it to an existing element""" + # global comment_bolcks + doc_elemt = ET.SubElement(obj, "doc") + text = format_nxdl_doc(check_for_mapping_char_other(value)).strip() + # To keep the doc middle of doc tag. + doc_elemt.text = f"\n{text}\n" + if line_loc is not None and line_number is not None: + xml_handle_comment(obj, line_number, line_loc, doc_elemt) + + +def xml_handle_units(obj, value): + """This function creates a 'units' element instance, and appends it to an existing element""" + obj.set("units", str(value)) + + +# pylint: disable=too-many-branches +def xml_handle_exists(dct, obj, keyword, value): + """ + This function creates an 'exists' element instance, and appends it to an existing element + """ + line_number = f"__line__{keyword}" + assert ( + value is not None + ), f"Line {dct[line_number]}: exists argument must not be None !" + if isinstance(value, list): + if len(value) == 4 and value[0] == "min" and value[2] == "max": + obj.set("minOccurs", str(value[1])) + if str(value[3]) != "infty": + obj.set("maxOccurs", str(value[3])) + else: + obj.set("maxOccurs", "unbounded") + elif len(value) == 2 and value[0] == "min": + obj.set("minOccurs", str(value[1])) + elif len(value) == 2 and value[0] == "max": + obj.set("maxOccurs", str(value[1])) + elif len(value) == 4 and value[0] == "max" and value[2] == "min": + obj.set("minOccurs", str(value[3])) + if str(value[1]) != "infty": + obj.set("maxOccurs", str(value[3])) + else: + obj.set("maxOccurs", "unbounded") + elif len(value) == 4 and (value[0] != "min" or value[2] != "max"): + raise ValueError( + f"Line {dct[line_number]}: exists keyword" + f"needs to go either with an optional [recommended] list with two " + f"entries either [min, ] or [max, ], or a list of four " + f"entries [min, , max, ] !" + ) + else: + raise ValueError( + f"Line {dct[line_number]}: exists keyword " + f"needs to go either with optional, recommended, a list with two " + f"entries either [min, ] or [max, ], or a list of four " + f"entries [min, , max, ] !" + ) + else: + # This clause take optional in all concept except dimension where 'required' key is allowed + # not the 'optional' key. + if value == "optional": + obj.set("optional", "true") + elif value == "recommended": + obj.set("recommended", "true") + elif value == "required": + obj.set("optional", "false") + else: + obj.set("minOccurs", "0") + + +# pylint: disable=too-many-branches, too-many-locals, too-many-statements +def xml_handle_group(dct, obj, keyword, value, verbose=False): + """ + The function deals with group instances + """ + line_number = f"__line__{keyword}" + line_loc = dct[line_number] + xml_handle_comment(obj, line_number, line_loc) + list_of_attr = [ + "name", + "type", + "nameType", + "deprecated", + "optional", + "recommended", + "exists", + "unit", + ] + l_bracket = -1 + r_bracket = -1 + if keyword.count("(") == 1: + l_bracket = keyword.index("(") + if keyword.count(")") == 1: + r_bracket = keyword.index(")") + + keyword_name, keyword_type = nx_name_type_resolving(keyword) + if not keyword_name and not keyword_type: + raise ValueError("A group must have both value and name. Check for group.") + grp = ET.SubElement(obj, "group") + + if l_bracket == 0 and r_bracket > 0: + grp.set("type", keyword_type) + if keyword_name: + grp.set("name", keyword_name) + elif l_bracket > 0: + grp.set("name", keyword_name) + if keyword_type: + grp.set("type", keyword_type) + else: + grp.set("name", keyword_name) + + if value: + rm_key_list = [] + for attr, vval in value.items(): + if "__line__" in attr: + continue + line_number = f"__line__{attr}" + line_loc = value[line_number] + if attr == "doc": + xml_handle_doc(grp, vval, line_number, line_loc) + rm_key_list.append(attr) + rm_key_list.append(line_number) + elif attr == "exists" and vval: + xml_handle_exists(value, grp, attr, vval) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, grp) + elif attr == "unit": + xml_handle_units(grp, vval) + xml_handle_comment(obj, line_number, line_loc, grp) + elif attr in list_of_attr and not isinstance(vval, dict) and vval: + validate_field_attribute_and_value(attr, vval, list_of_attr, value) + grp.set(attr, check_for_mapping_char_other(vval)) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, grp) + + for key in rm_key_list: + del value[key] + # Check for skipped attrinutes + check_for_skiped_attributes("group", value, list_of_attr, verbose) + if isinstance(value, dict) and value != {}: + recursive_build(grp, value, verbose) + + +def xml_handle_dimensions(dct, obj, keyword, value: dict): + """ + This function creates a 'dimensions' element instance, and appends it to an existing element + + NOTE: we could create xml_handle_dim() function. + But, the dim elements in yaml file is defined as 'dim =[[index, value]]' + but dim has other attributes such as 'ref' and also might have doc as chlid. + so in that sense 'dim' should have come as dict keeping attributes and child as members of + dict. + Regarding this situation all the attributes of 'dimensions' and child 'doc' has been + included here. + + Other attributes, except 'index' and 'value', of 'dim' comes under nested dict named + 'dim_parameter: + incr:[...]' + """ + + possible_dimension_attrs = ["rank"] # nxdl attributes + line_number = f"__line__{keyword}" + line_loc = dct[line_number] + assert "dim" in value.keys(), ( + f"Line {line_loc}: No dim as child of dimension has " f"been found." + ) + xml_handle_comment(obj, line_number, line_loc) + dims = ET.SubElement(obj, "dimensions") + # Consider all the childs under dimension is dim element and + # its attributes + + rm_key_list = [] + rank = "" + for key, val in value.items(): + if "__line__" in key: + continue + line_number = f"__line__{key}" + line_loc = value[line_number] + if key == "rank": + rank = val or "" + if isinstance(rank, int) and rank < 0: + raise ValueError( + f"Dimension must have some info about rank which is not " + f"available. Please check arround Line: {dct[line_number]}" + ) + dims.set(key, str(val)) + rm_key_list.append(key) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, dims) + # Check dimension doc and handle it + elif key == "doc" and isinstance(val, str): + xml_handle_doc(dims, val, line_number, line_loc) + rm_key_list.append(key) + rm_key_list.append(line_number) + elif key in possible_dimension_attrs and not isinstance(val, dict): + dims.set(key, str(val)) + rm_key_list.append(key) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, dims) + + for key in rm_key_list: + del value[key] + + xml_handle_dim_from_dimension_dict(dct, dims, keyword, value, rank=False) + + if isinstance(value, dict) and value != {}: + recursive_build(dims, value, verbose=None) + + +# pylint: disable=too-many-locals, too-many-arguments +def xml_handle_dim_from_dimension_dict( + dct, dims_obj, keyword, value, rank, verbose=False +): + """ + Handling dim element. + NOTE: The inputs 'keyword' and 'value' are as input for xml_handle_dimensions + function. please also read note in xml_handle_dimensions. + """ + + possible_dim_attrs = ["ref", "incr", "refindex", "required"] + + # Some attributes might have equivalent name e.g. 'required' is correct one and + # 'optional' could be another name. Then change attribute to the correct one. + wrong_to_correct_attr = [("optional", "required")] + header_line_number = f"__line__{keyword}" + dim_list = [] + rm_key_list = [] + # NOTE: dim doc and other attributes except 'index' and 'value' will come as list of value + # under dim_parameters + if not value: + return + rank = "" + # pylint: disable=too-many-nested-blocks + for attr, vvalue in value.items(): + if "__line__" in attr: + continue + line_number = f"__line__{attr}" + line_loc = value[line_number] + # dim comes in precedence + if attr == "dim": + # dim consists of list of [index, value] + llist_ind_value = vvalue + assert isinstance(llist_ind_value, list), ( + f"Line {value[line_number]}: dim" f"argument not a list !" + ) + xml_handle_comment(dims_obj, line_number, line_loc) + if isinstance(rank, int) and rank > 0: + assert rank == len(llist_ind_value), ( + f"Wrong dimension rank check around Line {dct[header_line_number]}.\n" + f"Line {[dct[header_line_number]]} rank value {rank} " + f"is not the same as dim array = " + f"{len(llist_ind_value)}." + ) + # Taking care of ind and value that comes as list of list + for dim_ind_val in llist_ind_value: + dim = ET.SubElement(dims_obj, "dim") + + # Taking care of multidimensions or rank + if len(dim_ind_val) >= 1 and dim_ind_val[0]: + dim.set("index", str(dim_ind_val[0])) + if len(dim_ind_val) == 2 and dim_ind_val[1]: + dim.set("value", str(dim_ind_val[1])) + dim_list.append(dim) + rm_key_list.append(attr) + rm_key_list.append(line_number) + elif attr == "dim_parameters" and isinstance(vvalue, dict): + xml_handle_comment(dims_obj, line_number, line_loc) + for kkkey, vvval in vvalue.items(): + if "__line__" in kkkey: + continue + cmnt_number = f"__line__{kkkey}" + cmnt_loc = vvalue[cmnt_number] + # Check whether any optional attributes added + for tuple_wng_crt in wrong_to_correct_attr: + if kkkey == tuple_wng_crt[0]: + raise ValueError( + f"{cmnt_loc}: Attribute '{kkkey}' is prohibited, use " + f"'{tuple_wng_crt[1]}" + ) + if kkkey == "doc" and dim_list: + # doc comes as list of doc + for i, dim in enumerate(dim_list): + if isinstance(vvval, list) and i < len(vvval): + tmp_val = vvval[i] + xml_handle_doc(dim, vvval[i], cmnt_number, cmnt_loc) + # Check all the dim have doc if not skip + elif isinstance(vvval, list) and i >= len(vvval): + pass + else: + for i, dim in enumerate(dim_list): + # all atribute of dims comes as list + if isinstance(vvval, list) and i < len(vvval): + tmp_val = vvval[i] + dim.set(kkkey, str(tmp_val)) + + # Check all the dim have doc if not skip + elif isinstance(vvval, list) and i >= len(vvval): + pass + # All dim might have the same value for the same attribute + elif not isinstance(vvval, list): + tmp_val = value + dim.set(kkkey, str(tmp_val)) + rm_key_list.append(attr) + rm_key_list.append(line_number) + else: + raise ValueError( + f"Got unexpected block except 'dim' and 'dim_parameters'." + f"Please check arround line {line_number}" + ) + + for key in rm_key_list: + del value[key] + + check_for_skiped_attributes("dim", value, possible_dim_attrs, verbose) + + +def xml_handle_enumeration(dct, obj, keyword, value, verbose): + """This function creates an 'enumeration' element instance. + + Two cases are handled: + 1) the items are in a list + 2) the items are dictionaries and may contain a nested doc + """ + line_number = f"__line__{keyword}" + line_loc = dct[line_number] + xml_handle_comment(obj, line_number, line_loc) + enum = ET.SubElement(obj, "enumeration") + + assert ( + value is not None + ), f"Line {line_loc}: enumeration must \ +bear at least an argument !" + assert ( + len(value) >= 1 + ), f"Line {dct[line_number]}: enumeration must not be an empty list!" + if isinstance(value, list): + for element in value: + itm = ET.SubElement(enum, "item") + itm.set("value", str(element)) + if isinstance(value, dict) and value != {}: + for element in value.keys(): + if "__line__" not in element: + itm = ET.SubElement(enum, "item") + itm.set("value", str(element)) + if isinstance(value[element], dict): + recursive_build(itm, value[element], verbose) + + +# pylint: disable=unused-argument +def xml_handle_link(dct, obj, keyword, value, verbose): + """ + If we have an NXDL link we decode the name attribute from (link)[:-6] + """ + + line_number = f"__line__{keyword}" + line_loc = dct[line_number] + xml_handle_comment(obj, line_number, line_loc) + possible_attrs = ["name", "target", "napimount"] + name = keyword[:-6] + link_obj = ET.SubElement(obj, "link") + link_obj.set("name", str(name)) + + if value: + rm_key_list = [] + for attr, vval in value.items(): + if "__line__" in attr: + continue + line_number = f"__line__{attr}" + line_loc = value[line_number] + if attr == "doc": + xml_handle_doc(link_obj, vval, line_number, line_loc) + rm_key_list.append(attr) + rm_key_list.append(line_number) + elif attr in possible_attrs and not isinstance(vval, dict): + if vval: + link_obj.set(attr, str(vval)) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, link_obj) + + for key in rm_key_list: + del value[key] + # Check for skipped attrinutes + check_for_skiped_attributes("link", value, possible_attrs, verbose) + + if isinstance(value, dict) and value != {}: + recursive_build(link_obj, value, verbose=None) + + +def xml_handle_choice(dct, obj, keyword, value, verbose=False): + """ + Build choice xml elements. That consists of groups. + """ + line_number = f"__line__{keyword}" + line_loc = dct[line_number] + xml_handle_comment(obj, line_number, line_loc) + # Add attributes in possible if new attributs have been added nexus definition. + possible_attr = [] + choice_obj = ET.SubElement(obj, "choice") + # take care of special attributes + name = keyword[:-8] + choice_obj.set("name", name) + + if value: + rm_key_list = [] + for attr, vval in value.items(): + if "__line__" in attr: + continue + line_number = f"__line__{attr}" + line_loc = value[line_number] + if attr == "doc": + xml_handle_doc(choice_obj, vval, line_number, line_loc) + rm_key_list.append(attr) + rm_key_list.append(line_number) + elif attr in possible_attr and not isinstance(vval, dict): + if vval: + choice_obj.set(attr, str(vval)) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, choice_obj) + + for key in rm_key_list: + del value[key] + # Check for skipped attrinutes + check_for_skiped_attributes("choice", value, possible_attr, verbose) + + if isinstance(value, dict) and value != {}: + recursive_build(choice_obj, value, verbose=None) + + +def xml_handle_symbols(dct, obj, keyword, value: dict): + """Handle a set of NXDL symbols as a child to obj""" + line_number = f"__line__{keyword}" + line_loc = dct[line_number] + assert ( + len(list(value.keys())) >= 1 + ), f"Line {line_loc}: symbols table must not be empty !" + xml_handle_comment(obj, line_number, line_loc) + syms = ET.SubElement(obj, "symbols") + if "doc" in value.keys(): + line_number = "__line__doc" + line_loc = value[line_number] + xml_handle_comment(syms, line_number, line_loc) + doctag = ET.SubElement(syms, "doc") + doctag.text = "\n" + textwrap.fill(value["doc"], width=70) + "\n" + rm_key_list = [] + for kkeyword, vvalue in value.items(): + if "__line__" in kkeyword: + continue + if kkeyword != "doc": + line_number = f"__line__{kkeyword}" + line_loc = value[line_number] + xml_handle_comment(syms, line_number, line_loc) + assert vvalue is not None and isinstance( + vvalue, str + ), f"Line {line_loc}: put a comment in doc string !" + sym = ET.SubElement(syms, "symbol") + sym.set("name", str(kkeyword)) + # sym_doc = ET.SubElement(sym, 'doc') + xml_handle_doc(sym, vvalue) + rm_key_list.append(kkeyword) + rm_key_list.append(line_number) + # sym_doc.text = '\n' + textwrap.fill(vvalue, width=70) + '\n' + for key in rm_key_list: + del value[key] + + +def check_keyword_variable(verbose, dct, keyword, value): + """ + Check whether both keyword_name and keyword_type are empty, + and complains if it is the case + """ + keyword_name, keyword_type = nx_name_type_resolving(keyword) + if verbose: + sys.stdout.write( + f"{keyword_name}({keyword_type}): value type is {type(value)}\n" + ) + if keyword_name == "" and keyword_type == "": + line_number = f"__line__{keyword}" + raise ValueError(f"Line {dct[line_number]}: found an improper yaml key !") + + +def helper_keyword_type(kkeyword_type): + """ + This function is returning a value of keyword_type if it belong to NX_TYPE_KEYS + """ + if kkeyword_type in NX_TYPE_KEYS: + return kkeyword_type + return None + + +def verbose_flag(verbose, keyword, value): + """ + Verbose stdout printing for nested levels of yaml file, if verbose flag is active + """ + if verbose: + sys.stdout.write(f" key:{keyword}; value type is {type(value)}\n") + + +def xml_handle_attributes(dct, obj, keyword, value, verbose): + """Handle the attributes found connected to attribute field""" + + line_number = f"__line__{keyword}" + line_loc = dct[line_number] + xml_handle_comment(obj, line_number, line_loc) + # list of possible attribute of xml attribute elementsa + attr_attr_list = [ + "name", + "type", + "unit", + "nameType", + "optional", + "recommended", + "minOccurs", + "maxOccurs", + "deprecated", + "exists", + ] + # as an attribute identifier + keyword_name, keyword_typ = nx_name_type_resolving(keyword) + line_number = f"__line__{keyword}" + if verbose: + print(f"__line__ : {dct[line_number]}") + if keyword_name == "" and keyword_typ == "": + raise ValueError(f"Line {dct[line_number]}: found an improper yaml key !") + elemt_obj = ET.SubElement(obj, "attribute") + elemt_obj.set("name", keyword_name[2:]) + if keyword_typ: + elemt_obj.set("type", keyword_typ) + + rm_key_list = [] + if value and value: + # taking care of attributes of attributes + for attr, attr_val in value.items(): + if "__line__" in attr: + continue + line_number = f"__line__{attr}" + line_loc = value[line_number] + if attr in ["doc", *attr_attr_list] and not isinstance(attr_val, dict): + if attr == "unit": + elemt_obj.set(f"{attr}s", str(value[attr])) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, elemt_obj) + elif attr == "exists" and attr_val: + xml_handle_exists(value, elemt_obj, attr, attr_val) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, elemt_obj) + elif attr == "doc": + xml_handle_doc( + elemt_obj, format_nxdl_doc(attr_val), line_number, line_loc + ) + rm_key_list.append(attr) + rm_key_list.append(line_number) + else: + elemt_obj.set(attr, check_for_mapping_char_other(attr_val)) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, elemt_obj) + + for key in rm_key_list: + del value[key] + # Check cor skiped attribute + check_for_skiped_attributes("Attribute", value, attr_attr_list, verbose) + if value: + recursive_build(elemt_obj, value, verbose) + + +def validate_field_attribute_and_value(v_attr, vval, allowed_attribute, value): + """ + Check for any attributes that comes with invalid name, + and invalid value. + """ + + # check for empty val + if not isinstance(vval, dict) and not str(vval): # check for empty value + line_number = f"__line__{v_attr}" + raise ValueError( + f"In a field a valid attrbute ('{v_attr}') found that is not stored." + f" Please check arround line {value[line_number]}" + ) + + # The bellow elements might come as child element + skipped_child_name = ["doc", "dimension", "enumeration", "choice", "exists"] + # check for invalid key or attributes + if ( + v_attr not in [*skipped_child_name, *allowed_attribute] + and "__line__" not in v_attr + and not isinstance(vval, dict) + and "(" not in v_attr # skip only groups and field that has name and type + and "\\@" not in v_attr + ): # skip nexus attributes + line_number = f"__line__{v_attr}" + raise ValueError( + f"In a field or group a invalid attribute ('{v_attr}') or child has found." + f" Please check arround line {value[line_number]}." + ) + + +def xml_handle_fields(obj, keyword, value, line_annot, line_loc, verbose=False): + """ + Handle a field in yaml file. + When a keyword is NOT: + symbol, + NX baseclass member, + attribute (\\@), + doc, + enumerations, + dimension, + exists, + then the not empty keyword_name is a field! + This simple function will define a new node of xml tree + """ + # List of possible attributes of xml elements + allowed_attr = [ + "name", + "type", + "nameType", + "unit", + "minOccurs", + "long_name", + "axis", + "signal", + "deprecated", + "axes", + "exists", + "data_offset", + "interpretation", + "maxOccurs", + "primary", + "recommended", + "optional", + "stride", + ] + + xml_handle_comment(obj, line_annot, line_loc) + l_bracket = -1 + r_bracket = -1 + if keyword.count("(") == 1: + l_bracket = keyword.index("(") + if keyword.count(")") == 1: + r_bracket = keyword.index(")") + + keyword_name, keyword_type = nx_name_type_resolving(keyword) + if not keyword_type and not keyword_name: + raise ValueError("Check for name or type in field.") + elemt_obj = ET.SubElement(obj, "field") + + # type come first + if l_bracket == 0 and r_bracket > 0: + elemt_obj.set("type", keyword_type) + if keyword_name: + elemt_obj.set("name", keyword_name) + elif l_bracket > 0: + elemt_obj.set("name", keyword_name) + if keyword_type: + elemt_obj.set("type", keyword_type) + else: + elemt_obj.set("name", keyword_name) + + if value: + rm_key_list = [] + # In each each if clause apply xml_handle_comment(), to collect + # comments on that yaml line. + for attr, vval in value.items(): + if "__line__" in attr: + continue + line_number = f"__line__{attr}" + line_loc = value[line_number] + if attr == "doc": + xml_handle_doc( + elemt_obj, + vval, + line_number, + line_loc, + ) + rm_key_list.append(attr) + rm_key_list.append(line_number) + elif attr == "exists" and vval: + xml_handle_exists(value, elemt_obj, attr, vval) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, elemt_obj) + elif attr == "unit": + xml_handle_units(elemt_obj, vval) + xml_handle_comment(obj, line_number, line_loc, elemt_obj) + elif attr in allowed_attr and not isinstance(vval, dict) and vval: + validate_field_attribute_and_value(attr, vval, allowed_attr, value) + elemt_obj.set(attr, check_for_mapping_char_other(vval)) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, elemt_obj) + + for key in rm_key_list: + del value[key] + # Check for skipped attrinutes + check_for_skiped_attributes("field", value, allowed_attr, verbose) + + if isinstance(value, dict) and value != {}: + recursive_build(elemt_obj, value, verbose) + + +def xml_handle_comment( + obj: ET.Element, + line_annotation: str, + line_loc_no: int, + xml_ele: ET.Element = None, + is_def_cmnt: bool = False, +): + """ + Add xml comment: check for comments that has the same 'line_annotation' + (e.g. __line__data) and the same line_loc_no (e.g. 30). After that, i + does of three tasks: + 1. Returns list of comments texts (multiple members if element has multiple comments) + 2. Rearrange comment element and xml_ele where comment comes first. + 3. Append comment element when no xml_ele will no be provided. + """ + + line_info = (line_annotation, int(line_loc_no)) + if line_info in COMMENT_BLOCKS: # noqa: F821 + cmnt = COMMENT_BLOCKS.get_coment_by_line_info(line_info) # noqa: F821 + cmnt_text = cmnt.get_comment_text() + + if is_def_cmnt: + return cmnt_text + if xml_ele is not None: + obj.remove(xml_ele) + for string in cmnt_text: + si_comnt = ET.Comment(string) + obj.append(si_comnt) + obj.append(xml_ele) + elif not is_def_cmnt and xml_ele is None: + for string in cmnt_text: + si_comnt = ET.Comment(string) + obj.append(si_comnt) + else: + raise ValueError("Provied correct parameter values.") + return "" + + +def recursive_build(obj, dct, verbose): + """obj is the current node of the XML tree where we want to append to, + dct is a dictionary object which represents the content of a child to obj + dct may contain further dictionary nests, representing NXDL groups, + which trigger recursive processing + NXDL fields may contain attributes but trigger no recursion so attributes are leafs. + + """ + for keyword, value in iter(dct.items()): + if "__line__" in keyword: + continue + line_number = f"__line__{keyword}" + line_loc = dct[line_number] + keyword_name, keyword_type = nx_name_type_resolving(keyword) + check_keyword_variable(verbose, dct, keyword, value) + if verbose: + sys.stdout.write( + f"keyword_name:{keyword_name} keyword_type {keyword_type}\n" + ) + + if keyword[-6:] == "(link)": + xml_handle_link(dct, obj, keyword, value, verbose) + elif keyword[-8:] == "(choice)": + xml_handle_choice(dct, obj, keyword, value) + # The bellow xml_symbol clause is for the symbols that come ubde filed or attributes + # Root level symbols has been inside nyaml2nxdl() + elif keyword_type == "" and keyword_name == "symbols": + xml_handle_symbols(dct, obj, keyword, value) + + elif (keyword_type in NX_CLSS) or ( + keyword_type not in [*NX_TYPE_KEYS, "", *NX_NEW_DEFINED_CLASSES] + ): + # we can be sure we need to instantiate a new group + xml_handle_group(dct, obj, keyword, value, verbose) + + elif keyword_name[0:2] == NX_ATTR_IDNT: # check if obj qualifies + xml_handle_attributes(dct, obj, keyword, value, verbose) + elif keyword == "doc": + xml_handle_doc(obj, value, line_number, line_loc) + elif keyword == NX_UNIT_IDNT: + xml_handle_units(obj, value) + elif keyword == "enumeration": + xml_handle_enumeration(dct, obj, keyword, value, verbose) + + elif keyword == "dimensions": + xml_handle_dimensions(dct, obj, keyword, value) + + elif keyword == "exists": + xml_handle_exists(dct, obj, keyword, value) + # Handles fileds e.g. AXISNAME + elif keyword_name != "" and "__line__" not in keyword_name: + xml_handle_fields(obj, keyword, value, line_number, line_loc, verbose) + else: + raise ValueError( + f"An unfamiliar type of element {keyword} has been found which is " + f"not be able to be resolved. Chekc arround line {dct[line_number]}" + ) + + +def pretty_print_xml(xml_root, output_xml, def_comments=None): + """ + Print better human-readable indented and formatted xml file using + built-in libraries and preceding XML processing instruction + """ + dom = minidom.parseString(ET.tostring(xml_root, encoding="utf-8", method="xml")) + proc_instractionn = dom.createProcessingInstruction( + "xml-stylesheet", 'type="text/xsl" href="nxdlformat.xsl"' + ) + dom_comment = dom.createComment(DOM_COMMENT) + root = dom.firstChild + dom.insertBefore(proc_instractionn, root) + dom.insertBefore(dom_comment, root) + + if def_comments: + for string in def_comments: + def_comt_ele = dom.createComment(string) + dom.insertBefore(def_comt_ele, root) + + xml_string = dom.toprettyxml(indent=1 * DEPTH_SIZE, newl="\n", encoding="UTF-8") + with open("tmp.xml", "wb") as file_tmp: + file_tmp.write(xml_string) + flag = False + with open("tmp.xml", "r", encoding="utf-8") as file_out: + with open(output_xml, "w", encoding="utf-8") as file_out_mod: + for i in file_out.readlines(): + if "" not in i and "" not in i and flag is False: + file_out_mod.write(i) + elif "" in i and "" in i: + file_out_mod.write(i) + elif "" in i and "" not in i: + flag = True + white_spaces = len(i) - len(i.lstrip()) + file_out_mod.write(i) + elif "" not in i and "" not in i and flag is True: + file_out_mod.write((white_spaces + 5) * " " + i) + elif "" not in i and "" in i and flag is True: + file_out_mod.write(white_spaces * " " + i) + flag = False + os.remove("tmp.xml") + + +# pylint: disable=too-many-statements +def nyaml2nxdl(input_file: str, out_file, verbose: bool): + """ + Main of the nyaml2nxdl converter, creates XML tree, namespace and + schema, definitions then evaluates a dictionary nest of groups recursively and + fields or (their) attributes as childs of the groups + """ + + def_attributes = [ + "deprecated", + "ignoreExtraGroups", + "category", + "type", + "ignoreExtraFields", + "ignoreExtraAttributes", + "restricts", + ] + yml_appdef = yml_reader(input_file) + def_cmnt_text = [] + if verbose: + sys.stdout.write(f"input-file: {input_file}\n") + sys.stdout.write( + "application/base contains the following root-level entries:\n" + ) + sys.stdout.write(str(yml_appdef.keys())) + xml_root = ET.Element("definition", {}) + assert ( + "category" in yml_appdef.keys() + ), "Required root-level keyword category is missing!" + assert yml_appdef["category"] in [ + "application", + "base", + ], "Only \ +application and base are valid categories!" + assert "doc" in yml_appdef.keys(), "Required root-level keyword doc is missing!" + + name_extends = "" + yml_appdef_copy = yml_appdef.copy() + for kkey, vvalue in yml_appdef_copy.items(): + if "__line__" in kkey: + continue + line_number = f"__line__{kkey}" + line_loc_no = yml_appdef[line_number] + if not isinstance(vvalue, dict) and kkey in def_attributes: + xml_root.set(kkey, str(vvalue) or "") + cmnt_text = xml_handle_comment( + xml_root, line_number, line_loc_no, is_def_cmnt=True + ) + def_cmnt_text += cmnt_text if cmnt_text else [] + + del yml_appdef[line_number] + del yml_appdef[kkey] + # Taking care or name and extends + elif "NX" in kkey: + # Tacking the attribute order but the correct value will be stored later + # check for name first or type first if (NXobject)NXname then type first + l_bracket_ind = kkey.rfind("(") + r_bracket_ind = kkey.rfind(")") + if l_bracket_ind == 0: + extend = kkey[1:r_bracket_ind] + name = kkey[r_bracket_ind + 1 :] + xml_root.set("extends", extend) + xml_root.set("name", name) + elif l_bracket_ind > 0: + name = kkey[0:l_bracket_ind] + extend = kkey[l_bracket_ind + 1 : r_bracket_ind] + xml_root.set("name", name) + xml_root.set("extends", extend) + else: + name = kkey + xml_root.set("name", name) + xml_root.set("extends", "NXobject") + cmnt_text = xml_handle_comment( + xml_root, line_number, line_loc_no, is_def_cmnt=True + ) + def_cmnt_text += cmnt_text if cmnt_text else [] + + name_extends = kkey + + if "type" not in xml_root.attrib: + xml_root.set("type", "group") + # Taking care of namespaces + namespaces = { + "xmlns": "http://definition.nexusformat.org/nxdl/3.1", + "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", + "xsi:schemaLocation": "http://definition.nexusformat.org/nxdl/3.1 ../nxdl.xsd", + } + for key, ns_ in namespaces.items(): + xml_root.attrib[key] = ns_ + # Taking care of Symbols elements + if "symbols" in yml_appdef.keys(): + xml_handle_symbols(yml_appdef, xml_root, "symbols", yml_appdef["symbols"]) + + del yml_appdef["symbols"] + del yml_appdef["__line__symbols"] + + assert ( + isinstance(yml_appdef["doc"], str) and yml_appdef["doc"] != "" + ), "Doc \ +has to be a non-empty string!" + + line_number = "__line__doc" + line_loc_no = yml_appdef[line_number] + xml_handle_doc(xml_root, yml_appdef["doc"], line_number, line_loc_no) + + del yml_appdef["doc"] + + root_keys = 0 + for key in yml_appdef.keys(): + if "__line__" not in key: + root_keys += 1 + extra_key = key + + assert root_keys == 1, ( + f"Accepting at most keywords: category, doc, symbols, and NX... " + f"at root-level! check key at root level {extra_key}" + ) + + assert ( + "NX" in name_extends and len(name_extends) > 2 + ), "NX \ +keyword has an invalid pattern, or is too short!" + # Taking care if definition has empty content + if yml_appdef[name_extends]: + recursive_build(xml_root, yml_appdef[name_extends], verbose) + # Taking care of comments that comes at the end of file that is might not be intended for + # any nxdl elements. + if COMMENT_BLOCKS[-1].has_post_comment: # noqa: F821 + post_comment = COMMENT_BLOCKS[-1] # noqa: F821 + (lin_annot, line_loc) = post_comment.get_line_info() + xml_handle_comment(xml_root, lin_annot, line_loc) + + # Note: Just to keep the functionality if we need this functionality later. + default_attr = False + if default_attr: + check_for_default_attribute_and_value(xml_root) + pretty_print_xml(xml_root, out_file, def_cmnt_text) + if verbose: + sys.stdout.write("Parsed YAML to NXDL successfully\n") diff --git a/dev_tools/nyaml2nxdl/nyaml2nxdl_helper.py b/dev_tools/nyaml2nxdl/nyaml2nxdl_helper.py new file mode 100644 index 000000000..c55f5da7a --- /dev/null +++ b/dev_tools/nyaml2nxdl/nyaml2nxdl_helper.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +"""Main file of yaml2nxdl tool. +Users create NeXus instances by writing a YAML file +which details a hierarchy of data/metadata elements + +""" +# -*- coding: utf-8 -*- +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# Yaml library does not except the keys (escapechar "\t" and yaml separator ":") +# So the corresponding value is to skip them and +# and also carefull about this order +import hashlib + +from yaml.composer import Composer +from yaml.constructor import Constructor +from yaml.loader import Loader +from yaml.nodes import ScalarNode +from yaml.resolver import BaseResolver + +# NOTE: If any one change one of the bellow dict please change it for both +ESCAPE_CHAR_DICT_IN_YAML = {"\t": " ", "':'": ":"} + +ESCAPE_CHAR_DICT_IN_XML = {" ": "\t", "':'": ":"} + + +def remove_namespace_from_tag(tag): + """Helper function to remove the namespace from an XML tag.""" + + return tag.split("}")[-1] + + +class LineLoader(Loader): # pylint: disable=too-many-ancestors + """ + LineLoader parses a yaml into a python dictionary extended with extra items. + The new items have as keys __line__ and as values the yaml file line number + """ + + def compose_node(self, parent, index): + # the line number where the previous token has ended (plus empty lines) + node = Composer.compose_node(self, parent, index) + node.__line__ = self.line + 1 + return node + + def construct_mapping(self, node, deep=False): + node_pair_lst = node.value + node_pair_lst_for_appending = [] + + for key_node in node_pair_lst: + shadow_key_node = ScalarNode( + tag=BaseResolver.DEFAULT_SCALAR_TAG, + value="__line__" + key_node[0].value, + ) + shadow_value_node = ScalarNode( + tag=BaseResolver.DEFAULT_SCALAR_TAG, value=key_node[0].__line__ + ) + node_pair_lst_for_appending.append((shadow_key_node, shadow_value_node)) + + node.value = node_pair_lst + node_pair_lst_for_appending + return Constructor.construct_mapping(self, node, deep=deep) + + +def get_yaml_escape_char_dict(): + """Get escape char and the way to skip them in yaml.""" + return ESCAPE_CHAR_DICT_IN_YAML + + +def get_yaml_escape_char_reverter_dict(): + """To revert yaml escape char in xml constructor from yaml.""" + + return ESCAPE_CHAR_DICT_IN_XML + + +def type_check(nx_type): + """ + Check for nexus type if type is NX_CHAR get '' or get as it is. + """ + + if nx_type in ["NX_CHAR", ""]: + nx_type = "" + else: + nx_type = f"({nx_type})" + return nx_type + + +def get_node_parent_info(tree, node): + """ + Return tuple of (parent, index) where: + parent = node of parent within tree + index = index of node under parent + """ + + parent_map = {c: p for p in tree.iter() for c in p} + parent = parent_map[node] + return parent, list(parent).index(node) + + +def cleaning_empty_lines(line_list): + """ + Cleaning up empty lines on top and bottom. + """ + if not isinstance(line_list, list): + line_list = line_list.split("\n") if "\n" in line_list else [""] + + # Clining up top empty lines + while True: + if line_list[0].strip(): + break + line_list = line_list[1:] + if len(line_list) == 0: + line_list.append("") + return line_list + + # Clining bottom empty lines + while True: + if line_list[-1].strip(): + break + line_list = line_list[0:-1] + if len(line_list) == 0: + line_list.append("") + return line_list + + return line_list + + +def nx_name_type_resolving(tmp): + """ + extracts the eventually custom name {optional_string} + and type {nexus_type} from a YML section string. + YML section string syntax: optional_string(nexus_type) + """ + if tmp.count("(") == 1 and tmp.count(")") == 1: + # we can safely assume that every valid YML key resolves + # either an nx_ (type, base, candidate) class contains only 1 '(' and ')' + index_start = tmp.index("(") + index_end = tmp.index(")", index_start + 1) + typ = tmp[index_start + 1 : index_end] + nam = tmp.replace("(" + typ + ")", "") + return nam, typ + + # or a name for a member + typ = "" + nam = tmp + return nam, typ + + +def get_sha256_hash(file_name): + """Generate a sha256_hash for a given file.""" + sha_hash = hashlib.sha256() + + with open( + file=file_name, + mode="rb", + ) as file_obj: + # Update hash for each 4k block of bytes + for b_line in iter(lambda: file_obj.read(4096), b""): + sha_hash.update(b_line) + return sha_hash.hexdigest() + + +def extend_yamlfile_with_comment(yaml_file, file_to_be_appended, top_lines_list=None): + """Extend yaml file by the file_to_be_appended as comment.""" + + with open(yaml_file, mode="a+", encoding="utf-8") as f1_obj: + if top_lines_list: + for line in top_lines_list: + f1_obj.write(line) + + with open(file_to_be_appended, mode="r", encoding="utf-8") as f2_obj: + lines = f2_obj.readlines() + for line in lines: + f1_obj.write(f"# {line}") + + +def separate_hash_yaml_and_nxdl(yaml_file, sep_yaml, sep_xml): + """Separate the provided yaml file into yaml, nxdl and hash if yaml was extended with + nxdl at the end of yaml by + '\n# ++++++++++++++++++++++++++++++++++ SHA HASH \ + ++++++++++++++++++++++++++++++++++\n' + # ' + """ + sha_hash = "" + with open(yaml_file, "r", encoding="utf-8") as inp_file: + lines = inp_file.readlines() + # file to write yaml part + with open(sep_yaml, "w", encoding="utf-8") as yml_f_ob, open( + sep_xml, "w", encoding="utf-8" + ) as xml_f_ob: + last_line = "" + write_on_yaml = True + for ind, line in enumerate(lines): + if ind == 0: + last_line = line + # Write in file when ensured that the nest line is not with '++ SHA HASH ++' + elif "++ SHA HASH ++" not in line and write_on_yaml: + yml_f_ob.write(last_line) + last_line = line + elif "++ SHA HASH ++" in line: + write_on_yaml = False + last_line = "" + elif not write_on_yaml and not last_line: + # The first line of xml file has been found. Onward write lines directly + # into xml file. + if not sha_hash: + sha_hash = line.split("# ", 1)[-1].strip() + else: + xml_f_ob.write(line[2:]) + # If the yaml fiile does not contain any hash for nxdl then we may have last line. + if last_line: + yml_f_ob.write(last_line) + + return sha_hash diff --git a/dev_tools/tests/test_nyaml2nxdl.py b/dev_tools/tests/test_nyaml2nxdl.py new file mode 100644 index 000000000..792d8d462 --- /dev/null +++ b/dev_tools/tests/test_nyaml2nxdl.py @@ -0,0 +1,27 @@ +import os + +from click.testing import CliRunner + +from ..nyaml2nxdl import nyaml2nxdl as conv +from ..utils.nxdl_utils import find_definition_file + +# import subprocess + + +def test_conversion(): + root = find_definition_file("NXentry") + # subprocess.run(["python3","-m","dev_tools.nyaml2nxdl.nyaml2nxdl","--input-file",root]) + result = CliRunner().invoke(conv.launch_tool, ["--input-file", root]) + assert result.exit_code == 0 + yaml = root[:-9] + "_parsed.yaml" + # subprocess.run(["python3","-m","dev_tools.nyaml2nxdl.nyaml2nxdl","--input-file",yaml]) + result = CliRunner().invoke(conv.launch_tool, ["--input-file", yaml]) + assert result.exit_code == 0 + new_root = yaml[:-4] + "nxdl.xml" + with open(root, encoding="utf-8", mode="r") as tmp_f: + root_content = tmp_f.readlines() + with open(new_root, encoding="utf-8", mode="r") as tmp_f: + new_root_content = tmp_f.readlines() + assert root_content == new_root_content + os.remove(yaml) + os.remove(new_root) diff --git a/dev_tools/utils/nxdl_utils.py b/dev_tools/utils/nxdl_utils.py new file mode 100644 index 000000000..efba439be --- /dev/null +++ b/dev_tools/utils/nxdl_utils.py @@ -0,0 +1,854 @@ +# pylint: disable=too-many-lines +"""Parse NeXus definition files +""" + +import os +import textwrap +import xml.etree.ElementTree as ET +from functools import lru_cache +from glob import glob + + +class NxdlAttributeError(Exception): + """An exception for throwing an error when an Nxdl attribute is not found.""" + + +def get_app_defs_names(): + """Returns all the AppDef names without their extension: .nxdl.xml""" + app_def_path_glob = ( + f"{get_nexus_definitions_path()}{os.sep}applications{os.sep}*.nxdl*" + ) + contrib_def_path_glob = ( + f"{get_nexus_definitions_path()}{os.sep}" + f"contributed_definitions{os.sep}*.nxdl*" + ) + files = sorted(glob(app_def_path_glob)) + sorted(glob(contrib_def_path_glob)) + return [os.path.basename(file).split(".")[0] for file in files] + ["NXroot"] + + +@lru_cache(maxsize=None) +def get_xml_root(file_path): + """Reducing I/O time by caching technique""" + + return ET.parse(file_path).getroot() + + +def get_nexus_definitions_path(): + """Check NEXUS_DEF_PATH variable. + If it is empty, this function is filling it""" + try: # either given by sys env + return os.environ["NEXUS_DEF_PATH"] + except KeyError: # or it should be available locally under the dir 'definitions' + local_dir = os.path.abspath(os.path.dirname(__file__)) + return os.path.join(local_dir, f"..{os.sep}..") + + +def get_hdf_root(hdf_node): + """Get the root HDF5 node""" + node = hdf_node + while node.name != "/": + node = node.parent + return node + + +def get_hdf_parent(hdf_info): + """Get the parent of an hdf_node in an hdf_info""" + if "hdf_path" not in hdf_info: + return hdf_info["hdf_node"].parent + node = ( + get_hdf_root(hdf_info["hdf_node"]) + if "hdf_root" not in hdf_info + else hdf_info["hdf_root"] + ) + for child_name in hdf_info["hdf_path"].split("/"): + node = node[child_name] + return node + + +def get_parent_path(hdf_name): + """Get parent path""" + return "/".join(hdf_name.split("/")[:-1]) + + +def get_hdf_info_parent(hdf_info): + """Get the hdf_info for the parent of an hdf_node in an hdf_info""" + if "hdf_path" not in hdf_info: + return {"hdf_node": hdf_info["hdf_node"].parent} + node = ( + get_hdf_root(hdf_info["hdf_node"]) + if "hdf_root" not in hdf_info + else hdf_info["hdf_root"] + ) + for child_name in hdf_info["hdf_path"].split("/")[1:-1]: + node = node[child_name] + return {"hdf_node": node, "hdf_path": get_parent_path(hdf_info["hdf_path"])} + + +def get_nx_class(nxdl_elem): + """Get the nexus class for a NXDL node""" + if "category" in nxdl_elem.attrib.keys(): + return None + try: + return nxdl_elem.attrib["type"] + except KeyError: + return "NX_CHAR" + + +def get_nx_namefit(hdf_name, name, name_any=False): + """Checks if an HDF5 node name corresponds to a child of the NXDL element + uppercase letters in front can be replaced by arbitraty name, but + uppercase to lowercase match is preferred, + so such match is counted as a measure of the fit""" + if name == hdf_name: + return len(name) * 2 + # count leading capitals + counting = 0 + while counting < len(name) and name[counting].upper() == name[counting]: + counting += 1 + if ( + name_any + or counting == len(name) + or (counting > 0 and hdf_name.endswith(name[counting:])) + ): # if potential fit + # count the matching chars + fit = 0 + for i in range(min(counting, len(hdf_name))): + if hdf_name[i].upper() == name[i]: + fit += 1 + else: + break + if fit == min(counting, len(hdf_name)): # accept only full fits as better fits + return fit + return 0 + return -1 # no fit + + +def get_nx_classes(): + """Read base classes from the NeXus definition folder. + Check each file in base_classes, applications, contributed_definitions. + If its category attribute is 'base', then it is added to the list.""" + base_classes = sorted( + glob(os.path.join(get_nexus_definitions_path(), "base_classes", "*.nxdl.xml")) + ) + applications = sorted( + glob(os.path.join(get_nexus_definitions_path(), "applications", "*.nxdl.xml")) + ) + contributed = sorted( + glob( + os.path.join( + get_nexus_definitions_path(), "contributed_definitions", "*.nxdl.xml" + ) + ) + ) + nx_clss = [] + for nexus_file in base_classes + applications + contributed: + root = get_xml_root(nexus_file) + if root.attrib["category"] == "base": + nx_clss.append(str(nexus_file[nexus_file.rindex(os.sep) + 1 :])[:-9]) + nx_clss = sorted(nx_clss) + return nx_clss + + +def get_nx_units(): + """Read unit kinds from the NeXus definition/nxdlTypes.xsd file""" + filepath = f"{get_nexus_definitions_path()}{os.sep}nxdlTypes.xsd" + root = get_xml_root(filepath) + units_and_type_list = [] + for child in root: + for i in child.attrib.values(): + units_and_type_list.append(i) + flag = False + for line in units_and_type_list: + if line == "anyUnitsAttr": + flag = True + nx_units = [] + elif "NX" in line and flag is True: + nx_units.append(line) + elif line == "primitiveType": + flag = False + else: + pass + return nx_units + + +def get_nx_attribute_type(): + """Read attribute types from the NeXus definition/nxdlTypes.xsd file""" + filepath = get_nexus_definitions_path() + "/nxdlTypes.xsd" + root = get_xml_root(filepath) + units_and_type_list = [] + for child in root: + for i in child.attrib.values(): + units_and_type_list.append(i) + flag = False + for line in units_and_type_list: + if line == "primitiveType": + flag = True + nx_types = [] + elif "NX" in line and flag is True: + nx_types.append(line) + elif line == "anyUnitsAttr": + flag = False + else: + pass + return nx_types + + +def get_node_name(node): + """Node - xml node. Returns html documentation name. + Either as specified by the 'name' or taken from the type (nx_class). + Note that if only class name is available, the NX prefix is removed and + the string is converted to UPPER case.""" + if "name" in node.attrib.keys(): + name = node.attrib["name"] + else: + name = node.attrib["type"] + if name.startswith("NX"): + name = name[2:].upper() + return name + + +def belongs_to(nxdl_elem, child, name, class_type=None, hdf_name=None): + """Checks if an HDF5 node name corresponds to a child of the NXDL element + uppercase letters in front can be replaced by arbitraty name, but + uppercase to lowercase match is preferred""" + if class_type and get_nx_class(child) != class_type: + return False + act_htmlname = get_node_name(child) + chk_name = hdf_name or name + if act_htmlname == chk_name: + return True + if not hdf_name: # search for name fits is only allowed for hdf_nodes + return False + try: # check if nameType allows different name + name_any = bool(child.attrib["nameType"] == "any") + except KeyError: + name_any = False + params = [act_htmlname, chk_name, name_any, nxdl_elem, child, name] + return belongs_to_capital(params) + + +def belongs_to_capital(params): + """Checking continues for Upper case""" + (act_htmlname, chk_name, name_any, nxdl_elem, child, name) = params + # or starts with capital and no reserved words used + if ( + (name_any or "A" <= act_htmlname[0] <= "Z") + and name != "doc" + and name != "enumeration" + ): + fit = get_nx_namefit(chk_name, act_htmlname, name_any) # check if name fits + if fit < 0: + return False + for child2 in nxdl_elem: + if ( + get_local_name_from_xml(child) != get_local_name_from_xml(child2) + or get_node_name(child2) == act_htmlname + ): + continue + # check if the name of another sibling fits better + name_any2 = ( + "nameType" in child2.attrib.keys() + and child2.attrib["nameType"] == "any" + ) + fit2 = get_nx_namefit(chk_name, get_node_name(child2), name_any2) + if fit2 > fit: + return False + # accept this fit + return True + return False + + +def get_local_name_from_xml(element): + """Helper function to extract the element tag without the namespace.""" + return element.tag[element.tag.rindex("}") + 1 :] + + +def get_own_nxdl_child_reserved_elements(child, name, nxdl_elem): + """checking reserved elements, like doc, enumeration""" + if get_local_name_from_xml(child) == "doc" and name == "doc": + if nxdl_elem.get("nxdlbase"): + child.set("nxdlbase", nxdl_elem.get("nxdlbase")) + child.set("nxdlbase_class", nxdl_elem.get("nxdlbase_class")) + child.set("nxdlpath", nxdl_elem.get("nxdlpath") + "/doc") + return child + if get_local_name_from_xml(child) == "enumeration" and name == "enumeration": + if nxdl_elem.get("nxdlbase"): + child.set("nxdlbase", nxdl_elem.get("nxdlbase")) + child.set("nxdlbase_class", nxdl_elem.get("nxdlbase_class")) + child.set("nxdlpath", nxdl_elem.get("nxdlpath") + "/enumeration") + return child + return False + + +def get_own_nxdl_child_base_types(child, class_type, nxdl_elem, name, hdf_name): + """checking base types of group, field,m attribute""" + if get_local_name_from_xml(child) == "group": + if ( + class_type is None or (class_type and get_nx_class(child) == class_type) + ) and belongs_to(nxdl_elem, child, name, class_type, hdf_name): + if nxdl_elem.get("nxdlbase"): + child.set("nxdlbase", nxdl_elem.get("nxdlbase")) + child.set("nxdlbase_class", nxdl_elem.get("nxdlbase_class")) + child.set( + "nxdlpath", nxdl_elem.get("nxdlpath") + "/" + get_node_name(child) + ) + return child + if get_local_name_from_xml(child) == "field" and belongs_to( + nxdl_elem, child, name, None, hdf_name + ): + if nxdl_elem.get("nxdlbase"): + child.set("nxdlbase", nxdl_elem.get("nxdlbase")) + child.set("nxdlbase_class", nxdl_elem.get("nxdlbase_class")) + child.set( + "nxdlpath", nxdl_elem.get("nxdlpath") + "/" + get_node_name(child) + ) + return child + if get_local_name_from_xml(child) == "attribute" and belongs_to( + nxdl_elem, child, name, None, hdf_name + ): + if nxdl_elem.get("nxdlbase"): + child.set("nxdlbase", nxdl_elem.get("nxdlbase")) + child.set("nxdlbase_class", nxdl_elem.get("nxdlbase_class")) + child.set( + "nxdlpath", nxdl_elem.get("nxdlpath") + "/" + get_node_name(child) + ) + return child + return False + + +def get_own_nxdl_child( + nxdl_elem, name, class_type=None, hdf_name=None, nexus_type=None +): + """Checks if an NXDL child node fits to the specific name (either nxdl or hdf) + name - nxdl name + class_type - nxdl type or hdf classname (for groups, it is obligatory) + hdf_name - hdf name""" + for child in nxdl_elem: + if "name" in child.attrib and child.attrib["name"] == name: + if nxdl_elem.get("nxdlbase"): + child.set("nxdlbase", nxdl_elem.get("nxdlbase")) + child.set("nxdlbase_class", nxdl_elem.get("nxdlbase_class")) + child.set( + "nxdlpath", nxdl_elem.get("nxdlpath") + "/" + get_node_name(child) + ) + return child + for child in nxdl_elem: + if "name" in child.attrib and child.attrib["name"] == name: + child.set("nxdlbase", nxdl_elem.get("nxdlbase")) + return child + + for child in nxdl_elem: + result = get_own_nxdl_child_reserved_elements(child, name, nxdl_elem) + if result is not False: + return result + if nexus_type and get_local_name_from_xml(child) != nexus_type: + continue + result = get_own_nxdl_child_base_types( + child, class_type, nxdl_elem, name, hdf_name + ) + if result is not False: + return result + return None + + +def find_definition_file(bc_name): + """find the nxdl file corresponding to the name. + Note that it first checks in contributed and goes beyond only if no contributed found + """ + bc_filename = None + for nxdl_folder in ["contributed_definitions", "base_classes", "applications"]: + if os.path.exists( + f"{get_nexus_definitions_path()}{os.sep}" + f"{nxdl_folder}{os.sep}{bc_name}.nxdl.xml" + ): + bc_filename = ( + f"{get_nexus_definitions_path()}{os.sep}" + f"{nxdl_folder}{os.sep}{bc_name}.nxdl.xml" + ) + break + return bc_filename + + +def get_nxdl_child( + nxdl_elem, name, class_type=None, hdf_name=None, nexus_type=None, go_base=True +): # pylint: disable=too-many-arguments + """Get the NXDL child node corresponding to a specific name + (e.g. of an HDF5 node,or of a documentation) note that if child is not found in application + definition, it also checks for the base classes""" + # search for possible fits for hdf_nodes : skipped + # only exact hits are returned when searching an nxdl child + own_child = get_own_nxdl_child(nxdl_elem, name, class_type, hdf_name, nexus_type) + if own_child is not None: + return own_child + if not go_base: + return None + bc_name = get_nx_class(nxdl_elem) # check in the base class, app def or contributed + if bc_name[2] == "_": # filter primitive types + return None + if ( + bc_name == "group" + ): # Check if it is the root element. Then send to NXroot.nxdl.xml + bc_name = "NXroot" + bc_filename = find_definition_file(bc_name) + if not bc_filename: + raise ValueError("nxdl file not found in definitions folder!") + bc_obj = ET.parse(bc_filename).getroot() + bc_obj.set("nxdlbase", bc_filename) + if "category" in bc_obj.attrib: + bc_obj.set("nxdlbase_class", bc_obj.attrib["category"]) + bc_obj.set("nxdlpath", "") + return get_own_nxdl_child(bc_obj, name, class_type, hdf_name, nexus_type) + + +def get_required_string(nxdl_elem): + """Check for being REQUIRED, RECOMMENDED, OPTIONAL, NOT IN SCHEMA""" + if nxdl_elem is None: + return "<>" + is_optional = ( + "optional" in nxdl_elem.attrib.keys() and nxdl_elem.attrib["optional"] == "true" + ) + is_minoccurs = ( + "minOccurs" in nxdl_elem.attrib.keys() and nxdl_elem.attrib["minOccurs"] == "0" + ) + is_recommended = ( + "recommended" in nxdl_elem.attrib.keys() + and nxdl_elem.attrib["recommended"] == "true" + ) + + if is_recommended: + return "<>" + if is_optional or is_minoccurs: + return "<>" + # default optionality: in BASE CLASSES is true; in APPLICATIONS is false + try: + if nxdl_elem.get("nxdlbase_class") == "base": + return "<>" + except TypeError: + return "<>" + return "<>" + + +# below there are some functions used in get_nxdl_doc function: +def write_doc_string(logger, doc, attr): + """Simple function that prints a line in the logger if doc exists""" + if doc: + logger.debug("@" + attr + " [NX_CHAR]") + return logger, doc, attr + + +def try_find_units(logger, elem, nxdl_path, doc, attr): + """Try to find if units is defined inside the field in the NXDL element, + otherwise try to find if units is defined as a child of the NXDL element.""" + try: # try to find if units is defined inside the field in the NXDL element + unit = elem.attrib[attr] + if doc: + logger.debug(get_node_concept_path(elem) + "@" + attr + " [" + unit + "]") + elem = None + nxdl_path.append(attr) + except ( + KeyError + ): # otherwise try to find if units is defined as a child of the NXDL element + orig_elem = elem + elem = get_nxdl_child(elem, attr, nexus_type="attribute") + if elem is not None: + if doc: + logger.debug( + get_node_concept_path(orig_elem) + + "@" + + attr + + " - [" + + get_nx_class(elem) + + "]" + ) + nxdl_path.append(elem) + else: # if no units category were defined in NXDL: + if doc: + logger.debug( + get_node_concept_path(orig_elem) + + "@" + + attr + + " - REQUIRED, but undefined unit category" + ) + nxdl_path.append(attr) + return logger, elem, nxdl_path, doc, attr + + +def check_attr_name_nxdl(param): + """Check for ATTRIBUTENAME_units in NXDL (normal). + If not defined, check for ATTRIBUTENAME to see if the ATTRIBUTE + is in the SCHEMA, but no units category were defined.""" + (logger, elem, nxdl_path, doc, attr, req_str) = param + orig_elem = elem + elem2 = get_nxdl_child(elem, attr, nexus_type="attribute") + if elem2 is not None: # check for ATTRIBUTENAME_units in NXDL (normal) + elem = elem2 + if doc: + logger.debug( + get_node_concept_path(orig_elem) + + "@" + + attr + + " - [" + + get_nx_class(elem) + + "]" + ) + nxdl_path.append(elem) + else: + # if not defined, check for ATTRIBUTENAME to see if the ATTRIBUTE + # is in the SCHEMA, but no units category were defined + elem2 = get_nxdl_child(elem, attr[:-6], nexus_type="attribute") + if elem2 is not None: + req_str = "<>" + if doc: + logger.debug( + get_node_concept_path(orig_elem) + + "@" + + attr + + " - RECOMMENDED, but undefined unit category" + ) + nxdl_path.append(attr) + else: # otherwise: NOT IN SCHEMA + elem = elem2 + if doc: + logger.debug( + get_node_concept_path(orig_elem) + + "@" + + attr + + " - IS NOT IN SCHEMA" + ) + return logger, elem, nxdl_path, doc, attr, req_str + + +def try_find_default( + logger, orig_elem, elem, nxdl_path, doc, attr +): # pylint: disable=too-many-arguments + """Try to find if default is defined as a child of the NXDL element""" + if elem is not None: + if doc: + logger.debug( + get_node_concept_path(orig_elem) + + "@" + + attr + + " - [" + + get_nx_class(elem) + + "]" + ) + nxdl_path.append(elem) + else: # if no default category were defined in NXDL: + if doc: + logger.debug(get_node_concept_path(orig_elem) + "@" + attr + " - [NX_CHAR]") + nxdl_path.append(attr) + return logger, elem, nxdl_path, doc, attr + + +def other_attrs( + logger, orig_elem, elem, nxdl_path, doc, attr +): # pylint: disable=too-many-arguments + """Handle remaining attributes""" + if elem is not None: + if doc: + logger.debug( + get_node_concept_path(orig_elem) + + "@" + + attr + + " - [" + + get_nx_class(elem) + + "]" + ) + nxdl_path.append(elem) + else: + if doc: + logger.debug( + get_node_concept_path(orig_elem) + "@" + attr + " - IS NOT IN SCHEMA" + ) + return logger, elem, nxdl_path, doc, attr + + +def get_node_concept_path(elem): + """get the short version of nxdlbase:nxdlpath""" + return str(elem.get("nxdlbase").split("/")[-1] + ":" + elem.get("nxdlpath")) + + +def get_doc(node, ntype, nxhtml, nxpath): + """Get documentation""" + # URL for html documentation + anchor = "" + for n_item in nxpath: + anchor += n_item.lower() + "-" + anchor = ( + "https://manual.nexusformat.org/classes/", + nxhtml + "#" + anchor.replace("_", "-") + ntype, + ) + if not ntype: + anchor = anchor[:-1] + doc = "" # RST documentation from the field 'doc' + doc_field = node.find("doc") + if doc_field is not None: + doc = doc_field.text + (index, enums) = get_enums(node) # enums + if index: + enum_str = ( + "\n " + + ("Possible values:" if len(enums.split(",")) > 1 else "Obligatory value:") + + "\n " + + enums + + "\n" + ) + else: + enum_str = "" + return anchor, doc + enum_str + + +def print_doc(node, ntype, level, nxhtml, nxpath): + """Print documentation""" + anchor, doc = get_doc(node, ntype, nxhtml, nxpath) + print(" " * (level + 1) + anchor) + preferred_width = 80 + level * 2 + wrapper = textwrap.TextWrapper( + initial_indent=" " * (level + 1), + width=preferred_width, + subsequent_indent=" " * (level + 1), + expand_tabs=False, + tabsize=0, + ) + if doc is not None: + for par in doc.split("\n"): + print(wrapper.fill(par)) + + +def get_namespace(element): + """Extracts the namespace for elements in the NXDL""" + return element.tag[element.tag.index("{") : element.tag.rindex("}") + 1] + + +def get_enums(node): + """Makes list of enumerations, if node contains any. + Returns comma separated STRING of enumeration values, if there are enum tag, + otherwise empty string.""" + # collect item values from enumeration tag, if any + namespace = get_namespace(node) + enums = [] + for enumeration in node.findall(f"{namespace}enumeration"): + for item in enumeration.findall(f"{namespace}item"): + enums.append(item.attrib["value"]) + enums = ",".join(enums) + if enums != "": + return (True, "[" + enums + "]") + return (False, "") # if there is no enumeration tag, returns empty string + + +def add_base_classes(elist, nx_name=None, elem: ET.Element = None): + """Add the base classes corresponding to the last eleme in elist to the list. Note that if + elist is empty, a nxdl file with the name of nx_name or a rather room elem is used if provided + """ + if elist and nx_name is None: + nx_name = get_nx_class(elist[-1]) + # to support recursive defintions, like NXsample in NXsample, the following test is removed + # if elist and nx_name and f"{nx_name}.nxdl.xml" in (e.get('nxdlbase') for e in elist): + # return + if elem is None: + if not nx_name: + return + nxdl_file_path = find_definition_file(nx_name) + if nxdl_file_path is None: + nxdl_file_path = f"{nx_name}.nxdl.xml" + elem = ET.parse(nxdl_file_path).getroot() + elem.set("nxdlbase", nxdl_file_path) + else: + elem.set("nxdlbase", "") + if "category" in elem.attrib: + elem.set("nxdlbase_class", elem.attrib["category"]) + elem.set("nxdlpath", "") + elist.append(elem) + # add inherited base class + if "extends" in elem.attrib and elem.attrib["extends"] != "NXobject": + add_base_classes(elist, elem.attrib["extends"]) + else: + add_base_classes(elist) + + +def set_nxdlpath(child, nxdl_elem): + """ + Setting up child nxdlbase, nxdlpath and nxdlbase_class from nxdl_element. + """ + if nxdl_elem.get("nxdlbase"): + child.set("nxdlbase", nxdl_elem.get("nxdlbase")) + child.set("nxdlbase_class", nxdl_elem.get("nxdlbase_class")) + child.set("nxdlpath", nxdl_elem.get("nxdlpath") + "/" + get_node_name(child)) + return child + + +def get_direct_child(nxdl_elem, html_name): + """returns the child of nxdl_elem which has a name + corresponding to the the html documentation name html_name""" + for child in nxdl_elem: + if get_local_name_from_xml(child) in ( + "group", + "field", + "attribute", + ) and html_name == get_node_name(child): + decorated_child = set_nxdlpath(child, nxdl_elem) + return decorated_child + return None + + +def get_field_child(nxdl_elem, html_name): + """returns the child of nxdl_elem which has a name + corresponding to the html documentation name html_name""" + data_child = None + for child in nxdl_elem: + if get_local_name_from_xml(child) != "field": + continue + if get_node_name(child) == html_name: + data_child = set_nxdlpath(child, nxdl_elem) + break + return data_child + + +def get_best_nxdata_child(nxdl_elem, hdf_node, hdf_name): + """returns the child of an NXdata nxdl_elem which has a name + corresponding to the hdf_name""" + nxdata = hdf_node.parent + signals = [] + if "signal" in nxdata.attrs.keys(): + signals.append(nxdata.attrs.get("signal")) + if "auxiliary_signals" in nxdata.attrs.keys(): + for aux_signal in nxdata.attrs.get("auxiliary_signals"): + signals.append(aux_signal) + data_child = get_field_child(nxdl_elem, "DATA") + data_error_child = get_field_child(nxdl_elem, "FIELDNAME_errors") + for signal in signals: + if signal == hdf_name: + return (data_child, 100) + if hdf_name.endswith("_errors") and signal == hdf_name[:-7]: + return (data_error_child, 100) + axes = [] + if "axes" in nxdata.attrs.keys(): + for axis in nxdata.attrs.get("axes"): + axes.append(axis) + axis_child = get_field_child(nxdl_elem, "AXISNAME") + for axis in axes: + if axis == hdf_name: + return (axis_child, 100) + return (None, 0) + + +def get_best_child(nxdl_elem, hdf_node, hdf_name, hdf_class_name, nexus_type): + """returns the child of nxdl_elem which has a name + corresponding to the the html documentation name html_name""" + bestfit = -1 + bestchild = None + if ( + "name" in nxdl_elem.attrib.keys() + and nxdl_elem.attrib["name"] == "NXdata" + and hdf_node is not None + and hdf_node.parent is not None + and hdf_node.parent.attrs.get("NX_class") == "NXdata" + ): + (fnd_child, fit) = get_best_nxdata_child(nxdl_elem, hdf_node, hdf_name) + if fnd_child is not None: + return (fnd_child, fit) + for child in nxdl_elem: + fit = -2 + if get_local_name_from_xml(child) == nexus_type and ( + nexus_type != "group" or get_nx_class(child) == hdf_class_name + ): + name_any = ( + "nameType" in nxdl_elem.attrib.keys() + and nxdl_elem.attrib["nameType"] == "any" + ) + fit = get_nx_namefit(hdf_name, get_node_name(child), name_any) + if fit > bestfit: + bestfit = fit + bestchild = set_nxdlpath(child, nxdl_elem) + return (bestchild, bestfit) + + +def walk_elist(elist, html_name): + """Handle elist from low priority inheritance classes to higher""" + for ind in range(len(elist) - 1, -1, -1): + child = get_direct_child(elist[ind], html_name) + if child is None: + # check for names fitting to a superclas definition + main_child = None + for potential_direct_parent in elist: + main_child = get_direct_child(potential_direct_parent, html_name) + if main_child is not None: + (fitting_child, _) = get_best_child( + elist[ind], + None, + html_name, + get_nx_class(main_child), + get_local_name_from_xml(main_child), + ) + if fitting_child is not None: + child = fitting_child + break + elist[ind] = child + if elist[ind] is None: + del elist[ind] + continue + # override: remove low priority inheritance classes if class_type is overriden + if len(elist) > ind + 1 and get_nx_class(elist[ind]) != get_nx_class( + elist[ind + 1] + ): + del elist[ind + 1 :] + # add new base class(es) if new element brings such (and not a primitive type) + if len(elist) == ind + 1 and get_nx_class(elist[ind])[0:3] != "NX_": + add_base_classes(elist) + return elist, html_name + + +@lru_cache(maxsize=None) +def get_inherited_nodes( + nxdl_path: str = None, # pylint: disable=too-many-arguments,too-many-locals + nx_name: str = None, + elem: ET.Element = None, + attr=False, +): + """Returns a list of ET.Element for the given path.""" + # let us start with the given definition file + elist = [] # type: ignore[var-annotated] + add_base_classes(elist, nx_name, elem) + nxdl_elem_path = [elist[0]] + + class_path = [] # type: ignore[var-annotated] + html_path = nxdl_path.split("/")[1:] + path = html_path + for pind in range(len(path)): + html_name = html_path[pind] + elist, html_name = walk_elist(elist, html_name) + if elist: + class_path.append(get_nx_class(elist[0])) + nxdl_elem_path.append(elist[0]) + return (class_path, nxdl_elem_path, elist) + + +def get_node_at_nxdl_path( + nxdl_path: str = None, + nx_name: str = None, + elem: ET.Element = None, + exc: bool = True, +): + """Returns an ET.Element for the given path. + This function either takes the name for the NeXus Application Definition + we are looking for or the root elem from a previously loaded NXDL file + and finds the corresponding XML element with the needed attributes.""" + try: + (class_path, nxdlpath, elist) = get_inherited_nodes(nxdl_path, nx_name, elem) + except ValueError as value_error: + if exc: + raise NxdlAttributeError( + f"Attributes were not found for {nxdl_path}. " + "Please check this entry in the template dictionary." + ) from value_error + return None + if class_path and nxdlpath and elist: + elem = elist[0] + else: + elem = None + if exc: + raise NxdlAttributeError( + f"Attributes were not found for {nxdl_path}. " + "Please check this entry in the template dictionary." + ) + return elem diff --git a/manual/source/conf.py b/manual/source/conf.py index 51b35e4bb..a1f854be4 100644 --- a/manual/source/conf.py +++ b/manual/source/conf.py @@ -42,6 +42,7 @@ # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ + 'sphinx_toolbox.collapse', 'sphinx.ext.mathjax', 'sphinx.ext.ifconfig', 'sphinx.ext.viewcode', @@ -94,5 +95,8 @@ # -- Options for Latex output ------------------------------------------------- latex_elements = { 'maxlistdepth':7, # some application definitions are deeply nested - 'preamble': '\\usepackage{amsbsy}\n' + 'preamble': r''' + \usepackage{amsbsy} + \DeclareUnicodeCharacter{1F517}{X} + \DeclareUnicodeCharacter{2906}{<=}''' } diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 000000000..67878319b --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,41 @@ +[build-system] +requires = ["setuptools>=64.0.1", "setuptools-scm[toml]>=6.2"] +build-backend = "setuptools.build_meta" + +[project] +name = "nexusdefinitions" +dynamic = ["version"] +authors = [ + { name = "NIAC" } +] +description = "Nexus definitions" +readme = "README.md" +license = { file = "LGPL.txt" } +requires-python = "" +classifiers = [ + "Operating System :: OS Independent" +] +dependencies = [ + "lxml", + "pyyaml", + "click>=7.1.2", + "sphinx>=5", + "sphinx-tabs", + "pytest", + "black>=22.3", + "flake8>=4", + "isort>=5.10", +] + +[project.urls] +"Homepage" = "https://nexusformat.org" + +[project.scripts] +nyaml2nxdl = "dev_tools.nyaml2nxdl.nyaml2nxdl:launch_tool" + +[tools.setuptools_scm] +version_scheme = "guess-next-dev" +local_scheme = "node-and-date" + +[tool.setuptools] +packages = ["dev_tools"] diff --git a/requirements.txt b/requirements.txt index 6d024bda3..91c5ae31a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,12 @@ # Prepare for Documentation lxml pyyaml +click # Documentation building sphinx>=5 sphinx-tabs +sphinx-toolbox # Testing pytest