From dc325851e5f3ca4ab020fc22d76148de9acc32e3 Mon Sep 17 00:00:00 2001 From: Sandor Brockhauser Date: Mon, 19 Jun 2023 14:53:03 +0200 Subject: [PATCH] nyaml2nxdl --- Makefile | 13 + dev_tools/nyaml2nxdl/README.md | 72 + dev_tools/nyaml2nxdl/__init__.py | 22 + dev_tools/nyaml2nxdl/comment_collector.py | 508 ++++++++ dev_tools/nyaml2nxdl/nyaml2nxdl.py | 227 ++++ .../nyaml2nxdl/nyaml2nxdl_backward_tools.py | 947 ++++++++++++++ .../nyaml2nxdl/nyaml2nxdl_forward_tools.py | 1161 +++++++++++++++++ dev_tools/nyaml2nxdl/nyaml2nxdl_helper.py | 230 ++++ pyproject.toml | 43 + 9 files changed, 3223 insertions(+) create mode 100644 dev_tools/nyaml2nxdl/README.md create mode 100644 dev_tools/nyaml2nxdl/__init__.py create mode 100644 dev_tools/nyaml2nxdl/comment_collector.py create mode 100755 dev_tools/nyaml2nxdl/nyaml2nxdl.py create mode 100755 dev_tools/nyaml2nxdl/nyaml2nxdl_backward_tools.py create mode 100644 dev_tools/nyaml2nxdl/nyaml2nxdl_forward_tools.py create mode 100644 dev_tools/nyaml2nxdl/nyaml2nxdl_helper.py create mode 100644 pyproject.toml diff --git a/Makefile b/Makefile index ae556d7339..113e29db8a 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ PYTHON = python3 SPHINX = sphinx-build BUILD_DIR = "build" +NXDL_DIRS := contributed_definitions applications base_classes .PHONY: help install style autoformat test clean prepare html pdf impatient-guide all local @@ -49,6 +50,9 @@ test :: clean :: $(RM) -rf $(BUILD_DIR) + for dir in $(NXDL_DIRS); do\ + $(RM) -rf $${dir}/nyaml;\ + done prepare :: $(PYTHON) -m dev_tools manual --prepare --build-root $(BUILD_DIR) @@ -83,6 +87,15 @@ all :: @echo "HTML built: `ls -lAFgh $(BUILD_DIR)/manual/build/html/index.html`" @echo "PDF built: `ls -lAFgh $(BUILD_DIR)/manual/build/latex/nexus.pdf`" +NXDLS := $(foreach dir,$(NXDL_DIRS),$(wildcard $(dir)/*.nxdl.xml)) +nyaml : $(DIRS) $(NXDLS) + for file in $^; do\ + mkdir -p "$${file%/*}/nyaml";\ + nyaml2nxdl --input-file $${file};\ + FNAME=$${file##*/};\ + mv -- "$${file%.nxdl.xml}_parsed.yaml" "$${file%/*}/nyaml/$${FNAME%.nxdl.xml}.yaml";\ + done + # NeXus - Neutron and X-ray Common Data Format # diff --git a/dev_tools/nyaml2nxdl/README.md b/dev_tools/nyaml2nxdl/README.md new file mode 100644 index 0000000000..ff083e1896 --- /dev/null +++ b/dev_tools/nyaml2nxdl/README.md @@ -0,0 +1,72 @@ +# YAML to NXDL converter and NXDL to YAML converter + +**NOTE: Please use python3.8 or above to run this converter** + +**Tools purpose**: Offer a simple YAML-based schema and a XML-based schema to describe NeXus instances. These can be NeXus application definitions or classes +such as base or contributed classes. Users either create NeXus instances by writing a YAML file or a XML file which details a hierarchy of data/metadata elements. +The forward (YAML -> NXDL.XML) and backward (NXDL.XML -> YAML) conversions are implemented. + +**How the tool works**: +- yaml2nxdl.py +1. Reads the user-specified NeXus instance, either in YML or XML format. +2. If input is in YAML, creates an instantiated NXDL schema XML tree by walking the dictionary nest. + If input is in XML, creates a YML file walking the dictionary nest. +3. Write the tree into a YAML file or a properly formatted NXDL XML schema file to disk. +4. Optionally, if --append argument is given, + the XML or YAML input file is interpreted as an extension of a base class and the entries contained in it + are appended below a standard NeXus base class. + You need to specify both your input file (with YAML or XML extension) and NeXus class (with no extension). + Both .yml and .nxdl.xml file of the extended class are printed. + +```console +user@box:~$ python yaml2nxdl.py + +Usage: python yaml2nxdl.py [OPTIONS] + +Options: + --input-file TEXT The path to the input data file to read. + --append TEXT Parse xml NeXus file and append to specified base class, + write the base class name with no extension. + --check-consistency Check consistency by generating another version of the input file. + E.g. for input file: NXexample.nxdl.xml the output file + NXexample_consistency.nxdl.xml. + --verbose Addictional std output info is printed to help debugging. + --help Show this message and exit. + +``` + +## Documentation + +**Rule set**: From transcoding YAML files we need to follow several rules. +* Named NeXus groups, which are instances of NeXus classes especially base or contributed classes. Creating (NXbeam) is a simple example of a request to define a group named according to NeXus default rules. mybeam1(NXbeam) or mybeam2(NXbeam) are examples how to create multiple named instances at the same hierarchy level. +* Members of groups so-called fields or attributes. A simple example of a member is voltage. Here the datatype is implied automatically as the default NeXus NX_CHAR type. By contrast, voltage(NX_FLOAT) can be used to instantiate a member of class which should be of NeXus type NX_FLOAT. +* And attributes of either groups or fields. Names of attributes have to be preceeded by \@ to mark them as attributes. +* Optionality: For all fields, groups and attributes in `application definitions` are `required` by default, except anything (`recommended` or `optional`) mentioned. + +**Special keywords**: Several keywords can be used as childs of groups, fields, and attributes to specify the members of these. Groups, fields and attributes are nodes of the XML tree. +* **doc**: A human-readable description/docstring +* **exists** Options are recommended, required, [min, 1, max, infty] numbers like here 1 can be replaced by any uint, or infty to indicate no restriction on how frequently the entry can occur inside the NXDL schema at the same hierarchy level. +* **link** Define links between nodes. +* **units** A statement introducing NeXus-compliant NXDL units arguments, like NX_VOLTAGE +* **dimensions** Details which dimensional arrays to expect +* **enumeration** Python list of strings which are considered as recommended entries to choose from. +* **dim_parameters** `dim` which is a child of `dimension` and the `dim` might have several attributes `ref`, +`incr` including `index` and `value`. So while writting `yaml` file schema definition please following structure: +``` +dimensions: + rank: integer value + dim: [[ind_1, val_1], [ind_2, val_2], ...] + dim_parameters: + ref: [ref_value_1, ref_value_2, ...] + incr: [incr_value_1, incr_value_2, ...] +``` +Keep in mind that length of all the lists must be same. + +## Next steps + +The NOMAD team is currently working on the establishing of a one-to-one mapping between +NeXus definitions and the NOMAD MetaInfo. As soon as this is in place the YAML files will +be annotated with further metadata so that they can serve two purposes. +On the one hand they can serve as an instance for a schema to create a GUI representation +of a NOMAD Oasis ELN schema. On the other hand the YAML to NXDL converter will skip all +those pieces of information which are irrelevant from a NeXus perspective. diff --git a/dev_tools/nyaml2nxdl/__init__.py b/dev_tools/nyaml2nxdl/__init__.py new file mode 100644 index 0000000000..22eb35f68d --- /dev/null +++ b/dev_tools/nyaml2nxdl/__init__.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +""" +# Load paths +""" +# -*- coding: utf-8 -*- +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/dev_tools/nyaml2nxdl/comment_collector.py b/dev_tools/nyaml2nxdl/comment_collector.py new file mode 100644 index 0000000000..5f0c5e3bce --- /dev/null +++ b/dev_tools/nyaml2nxdl/comment_collector.py @@ -0,0 +1,508 @@ +#!usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Collect comments in a list by CommentCollector class. Comment is a instance of Comment, +where each comment includes comment text and line info or neighbour info where the +comment must be assinged. + +The class Comment is an abstract class for general functions or method to be implemented +XMLComment and YAMLComment class. + +NOTE: Here comment block mainly stands for (comment text + line or element for what comment is +intended.) +""" + + +from typing import List, Type, Any, Tuple, Union, Dict +from pynxtools.nyaml2nxdl.nyaml2nxdl_helper import LineLoader + +__all__ = ['Comment', 'CommentCollector', 'XMLComment', 'YAMLComment'] + + +# pylint: disable=inconsistent-return-statements +class CommentCollector: + """CommentCollector will store a full comment ('Comment') object in + _comment_chain. + """ + + def __init__(self, input_file: str = None, + loaded_obj: Union[object, Dict] = None): + """ + Initialise CommentCollector + parameters: + input_file: raw input file (xml, yml) + loaded_obj: file loaded by third party library + """ + self._comment_chain: List = [] + self.file = input_file + self._comment_tracker = 0 + self._comment_hash: Dict[Tuple, Type[Comment]] = {} + self.comment: Type[Comment] + if self.file and not loaded_obj: + if self.file.split('.')[-1] == 'xml': + self.comment = XMLComment + if self.file.split('.')[-1] == 'yaml': + self.comment = YAMLComment + with open(self.file, "r", encoding="utf-8") as plain_text_yaml: + loader = LineLoader(plain_text_yaml) + self.comment.__yaml_dict__ = loader.get_single_data() + elif self.file and loaded_obj: + if self.file.split('.')[-1] == 'yaml' and isinstance(loaded_obj, dict): + self.comment = YAMLComment + self.comment.__yaml_dict__ = loaded_obj + else: + raise ValueError("Incorrect inputs for CommentCollector e.g. Wrong file extension.") + + else: + raise ValueError("Incorrect inputs for CommentCollector") + + def extract_all_comment_blocks(self): + """ + Collect all comments. Note that here comment means (comment text + element or line info + intended for comment. + """ + id_ = 0 + single_comment = self.comment(comment_id=id_) + with open(self.file, mode='r', encoding='UTF-8') as enc_f: + lines = enc_f.readlines() + # Make an empty line for last comment if no empty lines in original file + if lines[-1] != '': + lines.append('') + for line_num, line in enumerate(lines): + if single_comment.is_storing_single_comment(): + # If the last comment comes without post nxdl fields, groups and attributes + if '++ SHA HASH ++' in line: + # Handle with stored nxdl.xml file that is not part of yaml + line = '' + single_comment.process_each_line(line + 'post_comment', (line_num + 1)) + self._comment_chain.append(single_comment) + break + if line_num < (len(lines) - 1): + # Processing file from Line number 1 + single_comment.process_each_line(line, (line_num + 1)) + else: + # For processing last line of file + single_comment.process_each_line(line + 'post_comment', (line_num + 1)) + self._comment_chain.append(single_comment) + else: + self._comment_chain.append(single_comment) + single_comment = self.comment(last_comment=single_comment) + single_comment.process_each_line(line, (line_num + 1)) + + def get_comment(self): + """ + Return comment from comment_chain that must come earlier in order. + """ + return self._comment_chain[self._comment_tracker] + + def get_coment_by_line_info(self, comment_locs: Tuple[str, Union[int, str]]): + """ + Get comment using line information. + """ + if comment_locs in self._comment_hash: + return self._comment_hash[comment_locs] + + line_annot, line_loc = comment_locs + for cmnt in self._comment_chain: + if line_annot in cmnt: + line_loc_ = cmnt.get_line_number(line_annot) + if line_loc == line_loc_: + self._comment_hash[comment_locs] = cmnt + return cmnt + + def remove_comment(self, ind): + """Remove a comment from comment list. + """ + if ind < len(self._comment_chain): + del self._comment_chain[ind] + else: + raise ValueError("Oops! Index is out of range.") + + def reload_comment(self): + """ + Update self._comment_tracker after done with last comment. + """ + self._comment_tracker += 1 + + def __contains__(self, comment_locs: tuple): + """ + Confirm wether the comment corresponds to key_line and line_loc + is exist or not. + comment_locs is equvalant to (line_annotation, line_loc) e.g. + (__line__doc and 35) + """ + if not isinstance(comment_locs, tuple): + raise TypeError("Comment_locs should be 'tuple' containing line annotation " + "(e.g.__line__doc) and line_loc (e.g. 35).") + line_annot, line_loc = comment_locs + for cmnt in self._comment_chain: + if line_annot in cmnt: + line_loc_ = cmnt.get_line_number(line_annot) + if line_loc == line_loc_: + self._comment_hash[comment_locs] = cmnt + return True + return False + + def __getitem__(self, ind): + """Get comment from self.obj._comment_chain by index. + """ + if isinstance(ind, int): + if ind >= len(self._comment_chain): + raise IndexError(f'Oops! Comment index {ind} in {__class__} is out of range!') + return self._comment_chain[ind] + + if isinstance(ind, slice): + start_n = ind.start or 0 + end_n = ind.stop or len(self._comment_chain) + return self._comment_chain[start_n:end_n] + + def __iter__(self): + """get comment ieratively + """ + return iter(self._comment_chain) + + +# pylint: disable=too-many-instance-attributes +class Comment: + """ + This class is building yaml comment and the intended line for what comment is written. + """ + + def __init__(self, + comment_id: int = -1, + last_comment: 'Comment' = None) -> None: + """Comment object can be considered as a block element that includes + document element (an entity for what the comment is written). + """ + self._elemt: Any = None + self._elemt_text: str = None + self._is_elemt_found: bool = None + self._is_elemt_stored: bool = None + + self._comnt: str = '' + # If Multiple comments for one element or entity + self._comnt_list: List[str] = [] + self.last_comment: 'Comment' = last_comment if last_comment else None + if comment_id >= 0 and last_comment: + self.cid = comment_id + self.last_comment = last_comment + elif comment_id == 0 and not last_comment: + self.cid = comment_id + self.last_comment = None + elif last_comment: + self.cid = self.last_comment.cid + 1 + self.last_comment = last_comment + else: + raise ValueError("Neither last comment nor comment id dound") + self._comnt_start_found: bool = False + self._comnt_end_found: bool = False + self.is_storing_single_comment = lambda: not (self._comnt_end_found + and self._is_elemt_stored) + + def get_comment_text(self) -> Union[List, str]: + """ + Extract comment text from entrire comment (comment text + elment or + line for what comment is intended) + """ + + def append_comment(self, text: str) -> None: + """ + Append lines of the same comment. + """ + + def store_element(self, args) -> None: + """ + Strore comment text and line or element that is intended for comment. + """ + + +class XMLComment(Comment): + """ + XMLComment to store xml comment element. + """ + + def __init__(self, comment_id: int = -1, last_comment: 'Comment' = None) -> None: + super().__init__(comment_id, last_comment) + + def process_each_line(self, text, line_num): + """Take care of each line of text. Through which function the text + must be passed should be decide here. + """ + text = text.strip() + if text and line_num: + self.append_comment(text) + if self._comnt_end_found and not self._is_elemt_found: + # for multiple comment if exist + if self._comnt: + self._comnt_list.append(self._comnt) + self._comnt = '' + + if self._comnt_end_found: + self.store_element(text) + + def append_comment(self, text: str) -> None: + # Comment in single line + if '' == text[-4:]: + self._comnt_end_found = True + self._comnt_start_found = False + self._comnt = self._comnt.replace('-->', '') + + elif '-->' == text[0:4] and self._comnt_start_found: + self._comnt_end_found = True + self._comnt_start_found = False + self._comnt = self._comnt + '\n' + text.replace('-->', '') + elif self._comnt_start_found: + self._comnt = self._comnt + '\n' + text + + # pylint: disable=arguments-differ, arguments-renamed + def store_element(self, text) -> None: + def collect_xml_attributes(text_part): + for part in text_part: + part = part.strip() + if part and '">' == ''.join(part[-2:]): + self._is_elemt_stored = True + self._is_elemt_found = False + part = ''.join(part[0:-2]) + elif part and '"/>' == ''.join(part[-3:]): + self._is_elemt_stored = True + self._is_elemt_found = False + part = ''.join(part[0:-3]) + elif part and '/>' == ''.join(part[-2:]): + self._is_elemt_stored = True + self._is_elemt_found = False + part = ''.join(part[0:-2]) + elif part and '>' == part[-1]: + self._is_elemt_stored = True + self._is_elemt_found = False + part = ''.join(part[0:-1]) + elif part and '"' == part[-1]: + part = ''.join(part[0:-1]) + + if '="' in part: + lf_prt, rt_prt = part.split('="') + else: + continue + if ':' in lf_prt: + continue + self._elemt[lf_prt] = str(rt_prt) + if not self._elemt: + self._elemt = {} + # First check for comment part has been collected prefectly + if ' Union[List, str]: + """ + This method returns list of commnent text. As some xml element might have + multiple separated comment intended for a single element. + """ + return self._comnt_list + + +class YAMLComment(Comment): + """ + This class for stroing comment text as well as location of the comment e.g. line + number of other in the file. + NOTE: + 1. Do not delete any element form yaml dictionary (for loaded_obj. check: Comment_collector + class. because this loaded file has been exploited in nyaml2nxdl forward tools.) + """ + # Class level variable. The main reason behind that to follow structure of + # abstract class 'Comment' + __yaml_dict__: dict = {} + __yaml_line_info: dict = {} + __comment_escape_char = {'--': '-\\-'} + + def __init__(self, comment_id: int = -1, last_comment: 'Comment' = None) -> None: + """Initialization of YAMLComment follow Comment class. + """ + super().__init__(comment_id, last_comment) + self.collect_yaml_line_info(YAMLComment.__yaml_dict__, YAMLComment.__yaml_line_info) + + def process_each_line(self, text, line_num): + """Take care of each line of text. Through which function the text + must be passed should be decide here. + """ + text = text.strip() + self.append_comment(text) + if self._comnt_end_found and not self._is_elemt_found: + if self._comnt: + self._comnt_list.append(self._comnt) + self._comnt = '' + + if self._comnt_end_found: + line_key = '' + if ':' in text: + ind = text.index(':') + line_key = '__line__' + ''.join(text[0:ind]) + + for l_num, l_key in self.__yaml_line_info.items(): + if line_num == int(l_num) and line_key == l_key: + self.store_element(line_key, line_num) + break + # Comment comes very end of the file + if text == 'post_comment' and line_key == '': + line_key = '__line__post_comment' + self.store_element(line_key, line_num) + + def has_post_comment(self): + """ + Ensure is this a post coment or not. + Post comment means the comment that come at the very end without having any + nxdl element(class, group, filed and attribute.) + """ + for key, _ in self._elemt.items(): + if '__line__post_comment' == key: + return True + return False + + def append_comment(self, text: str) -> None: + """ + Collects all the line of the same comment and + append them with that single comment. + """ + # check for escape char + text = self.replace_scape_char(text) + # Empty line after last line of comment + if not text and self._comnt_start_found: + self._comnt_end_found = True + self._comnt_start_found = False + # For empty line inside doc or yaml file. + elif not text: + return + elif '# ' == ''.join(text[0:2]): + self._comnt_start_found = True + self._comnt_end_found = False + self._comnt = '' if not self._comnt else self._comnt + '\n' + self._comnt = self._comnt + ''.join(text[2:]) + elif '#' == text[0]: + self._comnt_start_found = True + self._comnt_end_found = False + self._comnt = '' if not self._comnt else self._comnt + '\n' + self._comnt = self._comnt + ''.join(text[1:]) + elif 'post_comment' == text: + self._comnt_end_found = True + self._comnt_start_found = False + # for any line after 'comment block' found + elif self._comnt_start_found: + self._comnt_start_found = False + self._comnt_end_found = True + + # pylint: disable=arguments-differ + def store_element(self, line_key, line_number): + """ + Store comment content and information of commen location (for what comment is + created.). + """ + self._elemt = {} + self._elemt[line_key] = int(line_number) + self._is_elemt_found = False + self._is_elemt_stored = True + + def get_comment_text(self): + """ + Return list of comments if there are multiple comment for same yaml line. + """ + return self._comnt_list + + def get_line_number(self, line_key): + """ + Retrun line number for what line the comment is created + """ + return self._elemt[line_key] + + def get_line_info(self): + """ + Return line annotation and line number from a comment. + """ + for line_anno, line_loc in self._elemt.items(): + return line_anno, line_loc + + def replace_scape_char(self, text): + """Replace escape char according to __comment_escape_char dict + """ + for ecp_char, ecp_alt in YAMLComment.__comment_escape_char.items(): + if ecp_char in text: + text = text.replace(ecp_char, ecp_alt) + return text + + def get_element_location(self): + """ + Retrun yaml line '__line__KEY' info and and line numner + """ + if len(self._elemt) > 1: + raise ValueError(f"Comment element should be one but got " + f"{self._elemt}") + + for key, val in self._elemt.items(): + yield key, val + + def collect_yaml_line_info(self, yaml_dict, line_info_dict): + """Collect __line__key and corresponding value from + a yaml file dictonary in another dictionary. + """ + for line_key, line_n in yaml_dict.items(): + if '__line__' in line_key: + line_info_dict[line_n] = line_key + + for _, val in yaml_dict.items(): + if isinstance(val, dict): + self.collect_yaml_line_info(val, line_info_dict) + + def __contains__(self, line_key): + """For Checking whether __line__NAME is in _elemt dict or not.""" + return line_key in self._elemt + + def __eq__(self, comment_obj): + """Check the self has same value as right comment. + """ + if len(self._comnt_list) != len(comment_obj._comnt_list): + return False + for left_cmnt, right_cmnt in zip(self._comnt_list, comment_obj._comnt_list): + left_cmnt = left_cmnt.split('\n') + right_cmnt = right_cmnt.split('\n') + for left_line, right_line in zip(left_cmnt, right_cmnt): + if left_line.strip() != right_line.strip(): + return False + return True diff --git a/dev_tools/nyaml2nxdl/nyaml2nxdl.py b/dev_tools/nyaml2nxdl/nyaml2nxdl.py new file mode 100755 index 0000000000..160b3f830d --- /dev/null +++ b/dev_tools/nyaml2nxdl/nyaml2nxdl.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python3 +"""Main file of yaml2nxdl tool. +Users create NeXus instances by writing a YAML file +which details a hierarchy of data/metadata elements + +""" +# -*- coding: utf-8 -*- +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import xml.etree.ElementTree as ET + +import click +from pynxtools.nyaml2nxdl.nyaml2nxdl_helper import (get_sha256_hash, + extend_yamlfile_with_comment, + separate_hash_yaml_and_nxdl) +from pynxtools.nyaml2nxdl.nyaml2nxdl_forward_tools import nyaml2nxdl, pretty_print_xml +from pynxtools.nyaml2nxdl.nyaml2nxdl_backward_tools import (Nxdl2yaml, + compare_niac_and_my) + + +DEPTH_SIZE = 4 * " " + +# NOTE: Some handful links for nyaml2nxdl converter: +# https://manual.nexusformat.org/nxdl_desc.html?highlight=optional + + +def generate_nxdl_or_retrieve_nxdl(yaml_file, out_xml_file, verbose): + """ + Generate yaml, nxdl and hash. + if the extracted hash is exactly the same as producd from generated yaml then + retrieve the nxdl part from provided yaml. + Else, generate nxdl from separated yaml with the help of nyaml2nxdl function + """ + pa_path, rel_file = os.path.split(yaml_file) + sep_yaml = os.path.join(pa_path, f'temp_{rel_file}') + hash_found = separate_hash_yaml_and_nxdl(yaml_file, sep_yaml, out_xml_file) + + if hash_found: + gen_hash = get_sha256_hash(sep_yaml) + if hash_found == gen_hash: + os.remove(sep_yaml) + return + + nyaml2nxdl(sep_yaml, out_xml_file, verbose) + os.remove(sep_yaml) + + +# pylint: disable=too-many-locals +def append_yml(input_file, append, verbose): + """Append to an existing NeXus base class new elements provided in YML input file \ +and print both an XML and YML file of the extended base class. + +""" + nexus_def_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../../definitions') + assert [s for s in os.listdir(os.path.join(nexus_def_path, 'base_classes') + ) if append.strip() == s.replace('.nxdl.xml', '')], \ + 'Your base class extension does not match any existing NeXus base classes' + tree = ET.parse(os.path.join(nexus_def_path + '/base_classes', append + '.nxdl.xml')) + root = tree.getroot() + # warning: tmp files are printed on disk and removed at the ends!! + pretty_print_xml(root, 'tmp.nxdl.xml') + input_tmp_xml = 'tmp.nxdl.xml' + out_tmp_yml = 'tmp_parsed.yaml' + converter = Nxdl2yaml([], []) + converter.print_yml(input_tmp_xml, out_tmp_yml, verbose) + nyaml2nxdl(input_file=out_tmp_yml, + out_file='tmp_parsed.nxdl.xml', + verbose=verbose) + tree = ET.parse('tmp_parsed.nxdl.xml') + tree2 = ET.parse(input_file) + root_no_duplicates = ET.Element( + 'definition', {'xmlns': 'http://definition.nexusformat.org/nxdl/3.1', + 'xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance', + 'xsi:schemaLocation': 'http://www.w3.org/2001/XMLSchema-instance' + } + ) + for attribute_keys in root.attrib.keys(): + if attribute_keys != '{http://www.w3.org/2001/XMLSchema-instance}schemaLocation': + attribute_value = root.attrib[attribute_keys] + root_no_duplicates.set(attribute_keys, attribute_value) + for elems in root.iter(): + if 'doc' in elems.tag: + root_doc = ET.SubElement(root_no_duplicates, 'doc') + root_doc.text = elems.text + break + group = '{http://definition.nexusformat.org/nxdl/3.1}group' + root_no_duplicates = compare_niac_and_my(tree, tree2, verbose, + group, + root_no_duplicates) + field = '{http://definition.nexusformat.org/nxdl/3.1}field' + root_no_duplicates = compare_niac_and_my(tree, tree2, verbose, + field, + root_no_duplicates) + attribute = '{http://definition.nexusformat.org/nxdl/3.1}attribute' + root_no_duplicates = compare_niac_and_my(tree, tree2, verbose, + attribute, + root_no_duplicates) + pretty_print_xml(root_no_duplicates, f"{input_file.replace('.nxdl.xml', '')}" + f"_appended.nxdl.xml") + + input_file_xml = input_file.replace('.nxdl.xml', "_appended.nxdl.xml") + out_file_yml = input_file.replace('.nxdl.xml', "_appended_parsed.yaml") + converter = Nxdl2yaml([], []) + converter.print_yml(input_file_xml, out_file_yml, verbose) + nyaml2nxdl(input_file=out_file_yml, + out_file=out_file_yml.replace('.yaml', '.nxdl.xml'), + verbose=verbose) + os.rename(f"{input_file.replace('.nxdl.xml', '_appended_parsed.yaml')}", + f"{input_file.replace('.nxdl.xml', '_appended.yaml')}") + os.rename(f"{input_file.replace('.nxdl.xml', '_appended_parsed.nxdl.xml')}", + f"{input_file.replace('.nxdl.xml', '_appended.nxdl.xml')}") + os.remove('tmp.nxdl.xml') + os.remove('tmp_parsed.yaml') + os.remove('tmp_parsed.nxdl.xml') + + +def split_name_and_extension(file_name): + """ + Split file name into extension and rest of the file name. + return file raw nam and extension + """ + parts = file_name.rsplit('.', 3) + if len(parts) == 2: + raw = parts[0] + ext = parts[1] + if len(parts) == 3: + raw = parts[0] + ext = '.'.join(parts[1:]) + + return raw, ext + + +@click.command() +@click.option( + '--input-file', + required=True, + prompt=True, + help='The path to the XML or YAML input data file to read and create \ +a YAML or XML file from, respectively.' +) +@click.option( + '--append', + help='Parse xml file and append to base class, given that the xml file has same name \ +of an existing base class' +) +@click.option( + '--check-consistency', + is_flag=True, + default=False, + help=('Check wether yaml or nxdl has followed general rules of scema or not' + 'check whether your comment in the right place or not. The option render an ' + 'output file of the same extension(*_consistency.yaml or *_consistency.nxdl.xml)') +) +@click.option( + '--verbose', + is_flag=True, + default=False, + help='Print in standard output keywords and value types to help \ +possible issues in yaml files' +) +def launch_tool(input_file, verbose, append, check_consistency): + """ + Main function that distiguishes the input file format and launches the tools. + """ + if os.path.isfile(input_file): + raw_name, ext = split_name_and_extension(input_file) + else: + raise ValueError("Need a valid input file.") + + if ext == 'yaml': + xml_out_file = raw_name + '.nxdl.xml' + generate_nxdl_or_retrieve_nxdl(input_file, xml_out_file, verbose) + if append: + append_yml(raw_name + '.nxdl.xml', + append, + verbose + ) + # For consistency running + if check_consistency: + yaml_out_file = raw_name + '_consistency.' + ext + converter = Nxdl2yaml([], []) + converter.print_yml(xml_out_file, yaml_out_file, verbose) + os.remove(xml_out_file) + elif ext == 'nxdl.xml': + if not append: + yaml_out_file = raw_name + '_parsed' + '.yaml' + converter = Nxdl2yaml([], []) + converter.print_yml(input_file, yaml_out_file, verbose) + # Append nxdl.xml file with yaml output file + yaml_hash = get_sha256_hash(yaml_out_file) + # Lines as divider between yaml and nxdl + top_lines = [('\n# ++++++++++++++++++++++++++++++++++ SHA HASH' + ' ++++++++++++++++++++++++++++++++++\n'), + f'# {yaml_hash}\n'] + + extend_yamlfile_with_comment(yaml_file=yaml_out_file, + file_to_be_appended=input_file, + top_lines_list=top_lines) + else: + append_yml(input_file, append, verbose) + # Taking care of consistency running + if check_consistency: + xml_out_file = raw_name + '_consistency.' + ext + generate_nxdl_or_retrieve_nxdl(yaml_out_file, xml_out_file, verbose) + os.remove(yaml_out_file) + else: + raise ValueError("Provide correct file with extension '.yaml or '.nxdl.xml") + + +if __name__ == '__main__': + launch_tool().parse() # pylint: disable=no-value-for-parameter diff --git a/dev_tools/nyaml2nxdl/nyaml2nxdl_backward_tools.py b/dev_tools/nyaml2nxdl/nyaml2nxdl_backward_tools.py new file mode 100755 index 0000000000..72f5a6c426 --- /dev/null +++ b/dev_tools/nyaml2nxdl/nyaml2nxdl_backward_tools.py @@ -0,0 +1,947 @@ +#!/usr/bin/env python3 +"""This file collects the function used in the reverse tool nxdl2yaml. + +""" +# -*- coding: utf-8 -*- +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import sys +from typing import List, Dict +import xml.etree.ElementTree as ET +import os + +from pynxtools.nyaml2nxdl.nyaml2nxdl_helper import (get_node_parent_info, + get_yaml_escape_char_dict, + cleaning_empty_lines) +from pynxtools.dataconverter.helpers import remove_namespace_from_tag + + +DEPTH_SIZE = " " +CMNT_TAG = '!--' + + +def separate_pi_comments(input_file): + """ + Separate PI comments from ProcessesInstruction (pi) + """ + comments_list = [] + comment = [] + xml_lines = [] + + with open(input_file, "r", encoding='utf-8') as file: + lines = file.readlines() + has_pi = True + for line in lines: + c_start = '' + def_tag = ' 0 and has_pi: + comment.append(line.replace(cmnt_end, '')) + comments_list.append(''.join(comment)) + comment = [] + elif def_tag in line or not has_pi: + has_pi = False + xml_lines.append(line) + elif len(comment) > 0 and has_pi: + comment.append(line) + else: + xml_lines.append(line) + return comments_list, ''.join(xml_lines) + + +# Collected: https://dustinoprea.com/2019/01/22/python-parsing-xml-and-retaining-the-comments/ +class _CommentedTreeBuilder(ET.TreeBuilder): + + def comment(self, text): + """ + defining comment builder in TreeBuilder + """ + self.start('!--', {}) + self.data(text) + self.end('--') + + +def parse(filepath): + """ + Construct parse function for modified tree builder for including modified TreeBuilder + and rebuilding XMLParser. + """ + comments, xml_str = separate_pi_comments(filepath) + ctb = _CommentedTreeBuilder() + xp_parser = ET.XMLParser(target=ctb) + root = ET.fromstring(xml_str, parser=xp_parser) + return comments, root + + +def handle_mapping_char(text, depth=-1, skip_n_line_on_top=False): + """Check for ":" char and replace it by "':'". """ + + escape_char = get_yaml_escape_char_dict() + for esc_key, val in escape_char.items(): + if esc_key in text: + text = text.replace(esc_key, val) + if not skip_n_line_on_top: + if depth > 0: + text = add_new_line_with_pipe_on_top(text, depth) + else: + raise ValueError("Need depth size to co-ordinate text line in yaml file.") + return text + + +def add_new_line_with_pipe_on_top(text, depth): + """ + Return modified text for what we get error in converter, such as ':'. After adding a + new line at the start of text the error is solved. + """ + char_list_to_add_new_line_on_top_of_text = [":"] + for char in char_list_to_add_new_line_on_top_of_text: + if char in text: + return '|' + '\n' + depth * DEPTH_SIZE + text + return text + + +# pylint: disable=too-many-instance-attributes +class Nxdl2yaml(): + """ + Parse XML file and print a YML file + """ + + def __init__( + self, + symbol_list: List[str], + root_level_definition: List[str], + root_level_doc='', + root_level_symbols=''): + + # updated part of yaml_dict + self.found_definition = False + self.root_level_doc = root_level_doc + self.root_level_symbols = root_level_symbols + self.root_level_definition = root_level_definition + self.symbol_list = symbol_list + self.is_last_element_comment = False + self.include_comment = True + self.pi_comments = None + # NOTE: Here is how root_level_comments organised for storing comments + # root_level_comment= {'root_doc': comment, + # 'symbols': comment, + # The 'symbol_doc_comments' list is for comments from all 'symbol doc' + # 'symbol_doc_comments' : [comments] + # 'symbol_list': [symbols], + # The 'symbol_comments' contains comments for 'symbols doc' and all 'symbol' + # 'symbol_comments': [comments]} + self.root_level_comment: Dict[str, str] = {} + + def print_yml(self, input_file, output_yml, verbose): + """ + Parse an XML file provided as input and print a YML file + """ + if os.path.isfile(output_yml): + os.remove(output_yml) + + depth = 0 + + self.pi_comments, root = parse(input_file) + xml_tree = {'tree': root, 'node': root} + self.xmlparse(output_yml, xml_tree, depth, verbose) + + def handle_symbols(self, depth, node): + """Handle symbols field and its childs symbol""" + + # pylint: disable=consider-using-f-string + self.root_level_symbols = ( + f"{remove_namespace_from_tag(node.tag)}: " + f"{node.text.strip() if node.text else ''}" + ) + depth += 1 + last_comment = '' + sbl_doc_cmnt_list = [] + # Comments that come above symbol tag + symbol_cmnt_list = [] + for child in list(node): + tag = remove_namespace_from_tag(child.tag) + if tag == CMNT_TAG and self.include_comment: + last_comment = self.comvert_to_ymal_comment(depth * DEPTH_SIZE, child.text) + if tag == 'doc': + symbol_cmnt_list.append(last_comment) + # The bellow line is for handling lenth of 'symbol_comments' and + # 'symbol_doc_comments'. Otherwise print_root_level_info() gets inconsistency + # over for the loop while writting comment on file + sbl_doc_cmnt_list.append('') + last_comment = '' + self.symbol_list.append(self.handle_not_root_level_doc(depth, + text=child.text)) + elif tag == 'symbol': + # place holder is symbol name + symbol_cmnt_list.append(last_comment) + last_comment = '' + if 'doc' in child.attrib: + self.symbol_list.append( + self.handle_not_root_level_doc(depth, + tag=child.attrib['name'], + text=child.attrib['doc'])) + else: + for symbol_doc in list(child): + tag = remove_namespace_from_tag(symbol_doc.tag) + if tag == CMNT_TAG and self.include_comment: + last_comment = self.comvert_to_ymal_comment(depth * DEPTH_SIZE, + symbol_doc.text) + if tag == 'doc': + sbl_doc_cmnt_list.append(last_comment) + last_comment = '' + self.symbol_list.append( + self.handle_not_root_level_doc(depth, + tag=child.attrib['name'], + text=symbol_doc.text)) + self.store_root_level_comments('symbol_doc_comments', sbl_doc_cmnt_list) + self.store_root_level_comments('symbol_comments', symbol_cmnt_list) + + def store_root_level_comments(self, holder, comment): + """Store yaml text or section line and the comments inteded for that lines or section""" + + self.root_level_comment[holder] = comment + + def handle_definition(self, node): + """ + Handle definition group and its attributes + NOTE: Here we tried to store the order of the xml element attributes. So that we get + exactly the same file in nxdl from yaml. + """ + # pylint: disable=consider-using-f-string + # self.root_level_definition[0] = '' + keyword = '' + # tmp_word for reseving the location + tmp_word = "#xx#" + attribs = node.attrib + # for tracking the order of name and type + keyword_order = -1 + for item in attribs: + if "name" in item: + keyword = keyword + attribs[item] + if keyword_order == -1: + self.root_level_definition.append(tmp_word) + keyword_order = self.root_level_definition.index(tmp_word) + elif "extends" in item: + keyword = f"{keyword}({attribs[item]})" + if keyword_order == -1: + self.root_level_definition.append(tmp_word) + keyword_order = self.root_level_definition.index(tmp_word) + elif 'schemaLocation' not in item \ + and 'extends' != item: + text = f"{item}: {attribs[item]}" + self.root_level_definition.append(text) + self.root_level_definition[keyword_order] = f"{keyword}:" + + def handle_root_level_doc(self, node): + """ + Handle the documentation field found at root level. + """ + # tag = remove_namespace_from_tag(node.tag) + text = node.text + text = self.handle_not_root_level_doc(depth=0, text=text) + self.root_level_doc = text + + # pylint: disable=too-many-branches + def handle_not_root_level_doc(self, depth, text, tag='doc', file_out=None): + """ + Handle docs field along the yaml file. In this function we also tried to keep + the track of intended indentation. E.g. the bollow doc block. + * Topic name + Description of topic + """ + + # Handling empty doc + if not text: + text = "" + else: + text = handle_mapping_char(text, -1, True) + if "\n" in text: + # To remove '\n' character as it will be added before text. + text = cleaning_empty_lines(text.split('\n')) + text_tmp = [] + yaml_indent_n = len((depth + 1) * DEPTH_SIZE) + # Find indentaion in the first text line with alphabet + tmp_i = 0 + while tmp_i != -1: + first_line_indent_n = 0 + # Taking care of empty text whitout any character + if len(text) == 1 and text[0] == '': + break + for ch_ in text[tmp_i]: + if ch_ == ' ': + first_line_indent_n = first_line_indent_n + 1 + elif ch_ != '': + tmp_i = -2 + break + tmp_i = tmp_i + 1 + # Taking care of doc like bellow: + # Text liness + # text continues + # So no indentaion at the staring or doc. So doc group will come along general + # alignment + if first_line_indent_n == 0: + first_line_indent_n = yaml_indent_n + + # for indent_diff -ve all lines will move left by the same ammout + # for indect_diff +ve all lines will move right the same amount + indent_diff = yaml_indent_n - first_line_indent_n + # CHeck for first line empty if not keep first line empty + + for _, line in enumerate(text): + line_indent_n = 0 + # Collect first empty space without alphabate + for ch_ in line: + if ch_ == ' ': + line_indent_n = line_indent_n + 1 + else: + break + line_indent_n = line_indent_n + indent_diff + if line_indent_n < yaml_indent_n: + # if line still under yaml identation + text_tmp.append(yaml_indent_n * ' ' + line.strip()) + else: + text_tmp.append(line_indent_n * ' ' + line.strip()) + + text = '\n' + '\n'.join(text_tmp) + if "}" in tag: + tag = remove_namespace_from_tag(tag) + indent = depth * DEPTH_SIZE + elif text: + text = '\n' + (depth + 1) * DEPTH_SIZE + text.strip() + if "}" in tag: + tag = remove_namespace_from_tag(tag) + indent = depth * DEPTH_SIZE + else: + text = "" + if "}" in tag: + tag = remove_namespace_from_tag(tag) + indent = depth * DEPTH_SIZE + + doc_str = f"{indent}{tag}: |{text}\n" + if file_out: + file_out.write(doc_str) + return None + return doc_str + + def write_out(self, indent, text, file_out): + """ + Write text line in output file. + """ + line_string = f"{indent}{text.rstrip()}\n" + file_out.write(line_string) + + def print_root_level_doc(self, file_out): + """ + Print at the root level of YML file \ + the general documentation field found in XML file + """ + indent = 0 * DEPTH_SIZE + + if ('root_doc' in self.root_level_comment + and self.root_level_comment['root_doc'] != ''): + text = self.root_level_comment['root_doc'] + self.write_out(indent, text, file_out) + + text = self.root_level_doc + self.write_out(indent, text, file_out) + self.root_level_doc = '' + + def comvert_to_ymal_comment(self, indent, text): + """ + Convert into yaml comment by adding exta '#' char in front of comment lines + """ + lines = text.split('\n') + mod_lines = [] + for line in lines: + line = line.strip() + if line and line[0] != '#': + line = indent + '# ' + line + mod_lines.append(line) + elif line: + line = indent + line + mod_lines.append(line) + # The starting '\n' to keep multiple comments separate + return '\n' + '\n'.join(mod_lines) + + def print_root_level_info(self, depth, file_out): + """ + Print at the root level of YML file \ + the information stored as definition attributes in the XML file + """ + # pylint: disable=consider-using-f-string + if depth < 0: + raise ValueError("Somthing wrong with indentaion in root level.") + + has_categoty = False + for def_line in self.root_level_definition: + if def_line in ("category: application", "category: base"): + self.write_out(indent=0 * DEPTH_SIZE, text=def_line, file_out=file_out) + # file_out.write(f"{def_line}\n") + has_categoty = True + + if not has_categoty: + raise ValueError("Definition dose not get any category from 'base or application'.") + self.print_root_level_doc(file_out) + if 'symbols' in self.root_level_comment and self.root_level_comment['symbols'] != '': + indent = depth * DEPTH_SIZE + text = self.root_level_comment['symbols'] + self.write_out(indent, text, file_out) + if self.root_level_symbols: + self.write_out(indent=0 * DEPTH_SIZE, text=self.root_level_symbols, file_out=file_out) + # symbol_list include 'symbols doc', and all 'symbol' + for ind, symbol in enumerate(self.symbol_list): + # Taking care of comments that come on to of 'symbols doc' and 'symbol' + if 'symbol_comments' in self.root_level_comment and \ + self.root_level_comment['symbol_comments'][ind] != '': + indent = depth * DEPTH_SIZE + self.write_out(indent, + self.root_level_comment['symbol_comments'][ind], file_out) + if 'symbol_doc_comments' in self.root_level_comment and \ + self.root_level_comment['symbol_doc_comments'][ind] != '': + + indent = depth * DEPTH_SIZE + self.write_out(indent, + self.root_level_comment['symbol_doc_comments'][ind], file_out) + + self.write_out(indent=(0 * DEPTH_SIZE), text=symbol, file_out=file_out) + if len(self.pi_comments) > 1: + indent = DEPTH_SIZE * depth + # The first comment is top level copy-right doc string + for comment in self.pi_comments[1:]: + self.write_out(indent, self.comvert_to_ymal_comment(indent, comment), file_out) + if self.root_level_definition: + # Soring NXname for writting end of the definition attributes + nx_name = '' + for defs in self.root_level_definition: + if 'NX' in defs and defs[-1] == ':': + nx_name = defs + continue + if defs in ("category: application", "category: base"): + continue + self.write_out(indent=0 * DEPTH_SIZE, text=defs, file_out=file_out) + self.write_out(indent=0 * DEPTH_SIZE, text=nx_name, file_out=file_out) + self.found_definition = False + + def handle_exists(self, exists_dict, key, val): + """ + Create exist component as folows: + + {'min' : value for min, + 'max' : value for max, + 'optional' : value for optional} + + This is created separately so that the keys stays in order. + """ + if not val: + val = '' + else: + val = str(val) + if 'minOccurs' == key: + exists_dict['minOccurs'] = ['min', val] + if 'maxOccurs' == key: + exists_dict['maxOccurs'] = ['max', val] + if 'optional' == key: + exists_dict['optional'] = ['optional', val] + if 'recommended' == key: + exists_dict['recommended'] = ['recommended', val] + if 'required' == key: + exists_dict['required'] = ['required', val] + + # pylint: disable=too-many-branches, consider-using-f-string + def handle_group_or_field(self, depth, node, file_out): + """Handle all the possible attributes that come along a field or group""" + + allowed_attr = ['optional', 'recommended', 'name', 'type', 'axes', 'axis', 'data_offset', + 'interpretation', 'long_name', 'maxOccurs', 'minOccurs', 'nameType', + 'optional', 'primary', 'signal', 'stride', 'units', 'required', + 'deprecated', 'exists'] + + name_type = "" + node_attr = node.attrib + rm_key_list = [] + # Maintain order: name and type in form name(type) or (type)name that come first + for key, val in node_attr.items(): + if key == 'name': + name_type = name_type + val + rm_key_list.append(key) + if key == 'type': + name_type = name_type + "(%s)" % val + rm_key_list.append(key) + if not name_type: + raise ValueError(f"No 'name' or 'type' hase been found. But, 'group' or 'field' " + f"must have at list a nme.We got attributes: {node_attr}") + file_out.write('{indent}{name_type}:\n'.format( + indent=depth * DEPTH_SIZE, + name_type=name_type)) + + for key in rm_key_list: + del node_attr[key] + + # tmp_dict intended to persevere order of attribnutes + tmp_dict = {} + exists_dict = {} + for key, val in node_attr.items(): + # As both 'minOccurs', 'maxOccurs' and optionality move to the 'exists' + if key in ['minOccurs', 'maxOccurs', 'optional', 'recommended', 'required']: + if 'exists' not in tmp_dict: + tmp_dict['exists'] = [] + self.handle_exists(exists_dict, key, val) + elif key == 'units': + tmp_dict['unit'] = str(val) + else: + tmp_dict[key] = str(val) + if key not in allowed_attr: + raise ValueError(f"An attribute ({key}) in 'field' or 'group' has been found " + f"that is not allowed. The allowed attr is {allowed_attr}.") + + if exists_dict: + for key, val in exists_dict.items(): + if key in ['minOccurs', 'maxOccurs']: + tmp_dict['exists'] = tmp_dict['exists'] + val + elif key in ['optional', 'recommended', 'required']: + tmp_dict['exists'] = key + + depth_ = depth + 1 + for key, val in tmp_dict.items(): + # Increase depth size inside handle_map...() for writting text with one + # more indentation. + file_out.write(f'{depth_ * DEPTH_SIZE}{key}: ' + f'{handle_mapping_char(val, depth_ + 1, False)}\n') + + # pylint: disable=too-many-branches, too-many-locals + def handle_dimension(self, depth, node, file_out): + """ + Handle the dimension field. + NOTE: Usually we take care of any xml element in xmlparse(...) and + recursion_in_xml_tree(...) functions. But Here it is a bit different. The doc dimension + and attributes of dim has been handled inside this function here. + """ + # pylint: disable=consider-using-f-string + possible_dim_attrs = ['ref', 'required', + 'incr', 'refindex'] + possible_dimemsion_attrs = ['rank'] + + # taking care of Dimension tag + file_out.write( + '{indent}{tag}:\n'.format( + indent=depth * DEPTH_SIZE, + tag=node.tag.split("}", 1)[1])) + # Taking care of dimension attributes + for attr, value in node.attrib.items(): + if attr in possible_dimemsion_attrs and not isinstance(value, dict): + indent = (depth + 1) * DEPTH_SIZE + file_out.write(f'{indent}{attr}: {value}\n') + else: + raise ValueError(f"Dimension has got an attribute {attr} that is not valid." + f"Current the allowd atributes are {possible_dimemsion_attrs}." + f" Please have a look") + # taking carew of dimension doc + for child in list(node): + tag = remove_namespace_from_tag(child.tag) + if tag == 'doc': + text = self.handle_not_root_level_doc(depth + 1, child.text) + file_out.write(text) + node.remove(child) + + dim_index_value = '' + dim_other_parts = {} + dim_cmnt_node = [] + # taking care of dim and doc childs of dimension + for child in list(node): + tag = remove_namespace_from_tag(child.tag) + child_attrs = child.attrib + # taking care of index and value attributes + if tag == ('dim'): + # taking care of index and value in format [[index, value]] + dim_index_value = dim_index_value + '[{index}, {value}], '.format( + index=child_attrs['index'] if "index" in child_attrs else '', + value=child_attrs['value'] if "value" in child_attrs else '') + if "index" in child_attrs: + del child_attrs["index"] + if "value" in child_attrs: + del child_attrs["value"] + + # Taking care of doc comes as child of dim + for cchild in list(child): + ttag = cchild.tag.split("}", 1)[1] + if ttag == ('doc'): + if ttag not in dim_other_parts: + dim_other_parts[ttag] = [] + text = cchild.text + dim_other_parts[ttag].append(text.strip()) + child.remove(cchild) + continue + # taking care of other attributes except index and value + for attr, value in child_attrs.items(): + if attr in possible_dim_attrs: + if attr not in dim_other_parts: + dim_other_parts[attr] = [] + dim_other_parts[attr].append(value) + if tag == CMNT_TAG and self.include_comment: + # Store and remove node so that comment nodes from dim node so + # that it does not call in xmlparser function + dim_cmnt_node.append(child) + node.remove(child) + + # All 'dim' element comments on top of 'dim' yaml key + if dim_cmnt_node: + for ch_nd in dim_cmnt_node: + self.handel_comment(depth + 1, ch_nd, file_out) + # index and value attributes of dim elements + file_out.write( + '{indent}dim: [{value}]\n'.format( + indent=(depth + 1) * DEPTH_SIZE, + value=dim_index_value[:-2] or '')) + # Write the attributes, except index and value, and doc of dim as child of dim_parameter. + # But tthe doc or attributes for each dim come inside list according to the order of dim. + if dim_other_parts: + file_out.write( + '{indent}dim_parameters:\n'.format( + indent=(depth + 1) * DEPTH_SIZE)) + # depth = depth + 2 dim_paramerter has child such as doc of dim + indent = (depth + 2) * DEPTH_SIZE + for key, value in dim_other_parts.items(): + if key == 'doc': + value = self.handle_not_root_level_doc(depth + 2, str(value), key, file_out) + else: + # Increase depth size inside handle_map...() for writting text with one + # more indentation. + file_out.write(f"{indent}{key}: " + f"{handle_mapping_char(value, depth + 3, False)}\n") + + def handle_enumeration(self, depth, node, file_out): + """ + Handle the enumeration field parsed from the xml file. + + If the enumeration items contain a doc field, the yaml file will contain items as child + fields of the enumeration field. + + If no doc are inherited in the enumeration items, a list of the items is given for the + enumeration list. + + """ + # pylint: disable=consider-using-f-string + + check_doc = [] + for child in list(node): + if list(child): + check_doc.append(list(child)) + # pylint: disable=too-many-nested-blocks + if check_doc: + file_out.write( + '{indent}{tag}: \n'.format( + indent=depth * DEPTH_SIZE, + tag=node.tag.split("}", 1)[1])) + for child in list(node): + tag = remove_namespace_from_tag(child.tag) + itm_depth = depth + 1 + if tag == ('item'): + file_out.write( + '{indent}{value}: \n'.format( + indent=(itm_depth) * DEPTH_SIZE, + value=child.attrib['value'])) + + if list(child): + for item_doc in list(child): + if remove_namespace_from_tag(item_doc.tag) == 'doc': + item_doc_depth = itm_depth + 1 + self.handle_not_root_level_doc(item_doc_depth, item_doc.text, + item_doc.tag, file_out) + if (remove_namespace_from_tag(item_doc.tag) == CMNT_TAG + and self.include_comment): + self.handel_comment(itm_depth + 1, item_doc, file_out) + if tag == CMNT_TAG and self.include_comment: + self.handel_comment(itm_depth + 1, child, file_out) + else: + enum_list = '' + remove_nodes = [] + for item_child in list(node): + tag = remove_namespace_from_tag(item_child.tag) + if tag == ('item'): + enum_list = enum_list + '{value}, '.format( + value=item_child.attrib['value']) + if tag == CMNT_TAG and self.include_comment: + self.handel_comment(depth, item_child, file_out) + remove_nodes.append(item_child) + for ch_node in remove_nodes: + node.remove(ch_node) + + file_out.write( + '{indent}{tag}: [{enum_list}]\n'.format( + indent=depth * DEPTH_SIZE, + tag=remove_namespace_from_tag(node.tag), + enum_list=enum_list[:-2] or '')) + + def handle_attributes(self, depth, node, file_out): + """Handle the attributes parsed from the xml file""" + + allowed_attr = ['name', 'type', 'units', 'nameType', 'recommended', 'optional', + 'minOccurs', 'maxOccurs', 'deprecated'] + + name = "" + node_attr = node.attrib + if 'name' in node_attr: + pass + else: + raise ValueError("Attribute must have an name key.") + rm_key_list = [] + # Maintain order: name and type in form name(type) or (type)name that come first + for key, val in node_attr.items(): + if key == 'name': + name = val + rm_key_list.append(key) + + for key in rm_key_list: + del node_attr[key] + + file_out.write('{indent}{escapesymbol}{name}:\n'.format( + indent=depth * DEPTH_SIZE, + escapesymbol=r'\@', + name=name)) + + tmp_dict = {} + exists_dict = {} + for key, val in node_attr.items(): + # As both 'minOccurs', 'maxOccurs' and optionality move to the 'exists' + if key in ['minOccurs', 'maxOccurs', 'optional', 'recommended', 'required']: + if 'exists' not in tmp_dict: + tmp_dict['exists'] = [] + self.handle_exists(exists_dict, key, val) + elif key == 'units': + tmp_dict['unit'] = val + else: + tmp_dict[key] = val + if key not in allowed_attr: + raise ValueError(f"An attribute ({key}) has been found that is not allowed." + f"The allowed attr is {allowed_attr}.") + + has_min_max = False + has_opt_reco_requ = False + if exists_dict: + for key, val in exists_dict.items(): + if key in ['minOccurs', 'maxOccurs']: + tmp_dict['exists'] = tmp_dict['exists'] + val + has_min_max = True + elif key in ['optional', 'recommended', 'required']: + tmp_dict['exists'] = key + has_opt_reco_requ = True + if has_min_max and has_opt_reco_requ: + raise ValueError("Optionality 'exists' can take only either from ['minOccurs'," + " 'maxOccurs'] or from ['optional', 'recommended', 'required']" + ". But not from both of the groups together. Please check in" + " attributes") + + depth_ = depth + 1 + for key, val in tmp_dict.items(): + # Increase depth size inside handle_map...() for writting text with one + # more indentation. + file_out.write(f'{depth_ * DEPTH_SIZE}{key}: ' + f'{handle_mapping_char(val, depth_ + 1, False)}\n') + + def handel_link(self, depth, node, file_out): + """ + Handle link elements of nxdl + """ + + possible_link_attrs = ['name', 'target', 'napimount'] + node_attr = node.attrib + # Handle special cases + if 'name' in node_attr: + file_out.write('{indent}{name}(link):\n'.format( + indent=depth * DEPTH_SIZE, + name=node_attr['name'] or '')) + del node_attr['name'] + + depth_ = depth + 1 + # Handle general cases + for attr_key, val in node_attr.items(): + if attr_key in possible_link_attrs: + file_out.write('{indent}{attr}: {value}\n'.format( + indent=depth_ * DEPTH_SIZE, + attr=attr_key, + value=val)) + else: + raise ValueError(f"An anexpected attribute '{attr_key}' of link has found." + f"At this moment the alloed keys are {possible_link_attrs}") + + def handel_choice(self, depth, node, file_out): + """ + Handle choice element which is a parent node of group. + """ + + possible_attr = [] + + node_attr = node.attrib + # Handle special casees + if 'name' in node_attr: + file_out.write('{indent}{attr}(choice): \n'.format( + indent=depth * DEPTH_SIZE, + attr=node_attr['name'])) + del node_attr['name'] + + depth_ = depth + 1 + # Taking care of general attrinutes. Though, still no attrinutes have found, + # but could be used for future + for attr in node_attr.items(): + if attr in possible_attr: + file_out.write('{indent}{attr}: {value}\n'.format( + indent=depth_ * DEPTH_SIZE, + attr=attr, + value=node_attr[attr])) + else: + raise ValueError(f"An unexpected attribute '{attr}' of 'choice' has been found." + f"At this moment attributes for choice {possible_attr}") + + def handel_comment(self, depth, node, file_out): + """ + Collect comment element and pass to write_out function + """ + indent = depth * DEPTH_SIZE + if self.is_last_element_comment: + text = self.comvert_to_ymal_comment(indent, node.text) + self.write_out(indent, text, file_out) + else: + text = self.comvert_to_ymal_comment(indent, node.text) + self.write_out(indent, text, file_out) + self.is_last_element_comment = True + + def recursion_in_xml_tree(self, depth, xml_tree, output_yml, verbose): + """ + Descend lower level in xml tree. If we are in the symbols branch, the recursive + behaviour is not triggered as we already handled the symbols' childs. + """ + + tree = xml_tree['tree'] + node = xml_tree['node'] + for child in list(node): + xml_tree_children = {'tree': tree, 'node': child} + self.xmlparse(output_yml, xml_tree_children, depth, verbose) + + # pylint: disable=too-many-branches, too-many-statements + def xmlparse(self, output_yml, xml_tree, depth, verbose): + """ + Main of the nxdl2yaml converter. + It parses XML tree, then prints recursively each level of the tree + """ + tree = xml_tree['tree'] + node = xml_tree['node'] + if verbose: + sys.stdout.write(f'Node tag: {remove_namespace_from_tag(node.tag)}\n') + sys.stdout.write(f'Attributes: {node.attrib}\n') + with open(output_yml, "a", encoding="utf-8") as file_out: + tag = remove_namespace_from_tag(node.tag) + if tag == 'definition': + self.found_definition = True + self.handle_definition(node) + # Taking care of root level doc and symbols + remove_cmnt_n = None + last_comment = '' + for child in list(node): + tag_tmp = remove_namespace_from_tag(child.tag) + if tag_tmp == CMNT_TAG and self.include_comment: + last_comment = self.comvert_to_ymal_comment(depth * DEPTH_SIZE, child.text) + remove_cmnt_n = child + if tag_tmp == 'doc': + self.store_root_level_comments('root_doc', last_comment) + last_comment = '' + self.handle_root_level_doc(child) + node.remove(child) + if remove_cmnt_n is not None: + node.remove(remove_cmnt_n) + remove_cmnt_n = None + if tag_tmp == 'symbols': + self.store_root_level_comments('symbols', last_comment) + last_comment = '' + self.handle_symbols(depth, child) + node.remove(child) + if remove_cmnt_n is not None: + node.remove(remove_cmnt_n) + remove_cmnt_n = None + + if tag == ('doc') and depth != 1: + parent = get_node_parent_info(tree, node)[0] + doc_parent = remove_namespace_from_tag(parent.tag) + if doc_parent != 'item': + self.handle_not_root_level_doc(depth, text=node.text, + tag=node.tag, + file_out=file_out) + + if self.found_definition is True and self.root_level_doc: + self.print_root_level_info(depth, file_out) + # End of print root-level definitions in file + if tag in ('field', 'group') and depth != 0: + self.handle_group_or_field(depth, node, file_out) + if tag == ('enumeration'): + self.handle_enumeration(depth, node, file_out) + if tag == ('attribute'): + self.handle_attributes(depth, node, file_out) + if tag == ('dimensions'): + self.handle_dimension(depth, node, file_out) + if tag == ('link'): + self.handel_link(depth, node, file_out) + if tag == ('choice'): + self.handel_choice(depth, node, file_out) + if tag == CMNT_TAG and self.include_comment: + self.handel_comment(depth, node, file_out) + depth += 1 + # Write nested nodes + self.recursion_in_xml_tree(depth, xml_tree, output_yml, verbose) + + +def compare_niac_and_my(tree, tree2, verbose, node, root_no_duplicates): + """This function creates two trees with Niac XML file and My XML file. +The main aim is to compare the two trees and create a new one that is the +union of the two initial trees. + +""" + root = tree.getroot() + root2 = tree2.getroot() + attrs_list_niac = [] + for nodo in root.iter(node): + attrs_list_niac.append(nodo.attrib) + if verbose: + sys.stdout.write('Attributes found in Niac file: \n') + sys.stdout.write(str(attrs_list_niac) + '\n') + sys.stdout.write(' \n') + sys.stdout.write('Started merging of Niac and My file... \n') + for elem in root.iter(node): + if verbose: + sys.stdout.write('- Niac element inserted: \n') + sys.stdout.write(str(elem.attrib) + '\n') + index = get_node_parent_info(tree, elem)[1] + root_no_duplicates.insert(index, elem) + + for elem2 in root2.iter(node): + index = get_node_parent_info(tree2, elem2)[1] + if elem2.attrib not in attrs_list_niac: + if verbose: + sys.stdout.write('- My element inserted: \n') + sys.stdout.write(str(elem2.attrib) + '\n') + root_no_duplicates.insert(index, elem2) + + if verbose: + sys.stdout.write(' \n') + return root_no_duplicates diff --git a/dev_tools/nyaml2nxdl/nyaml2nxdl_forward_tools.py b/dev_tools/nyaml2nxdl/nyaml2nxdl_forward_tools.py new file mode 100644 index 0000000000..db4d4c4644 --- /dev/null +++ b/dev_tools/nyaml2nxdl/nyaml2nxdl_forward_tools.py @@ -0,0 +1,1161 @@ +#!/usr/bin/env python3 +"""Creates an instantiated NXDL schema XML tree by walking the dictionary nest + +""" +# -*- coding: utf-8 -*- +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +import xml.etree.ElementTree as ET +from xml.dom import minidom +import os +import textwrap + +import yaml + +from pynxtools.nexus import nexus +from pynxtools.nyaml2nxdl.comment_collector import CommentCollector +from pynxtools.dataconverter.helpers import remove_namespace_from_tag +from pynxtools.nyaml2nxdl.nyaml2nxdl_helper import (get_yaml_escape_char_reverter_dict, + nx_name_type_resolving, + cleaning_empty_lines, LineLoader) + + +# pylint: disable=too-many-lines, global-statement, invalid-name +DOM_COMMENT = ("\n" + "# NeXus - Neutron and X-ray Common Data Format\n" + "# \n" + "# Copyright (C) 2014-2022 NeXus International Advisory Committee (NIAC)\n" + "# \n" + "# This library is free software; you can redistribute it and/or\n" + "# modify it under the terms of the GNU Lesser General Public\n" + "# License as published by the Free Software Foundation; either\n" + "# version 3 of the License, or (at your option) any later version.\n" + "#\n" + "# This library is distributed in the hope that it will be useful,\n" + "# but WITHOUT ANY WARRANTY; without even the implied warranty of\n" + "# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU\n" + "# Lesser General Public License for more details.\n" + "#\n" + "# You should have received a copy of the GNU Lesser General Public\n" + "# License along with this library; if not, write to the Free Software\n" + "# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA\n" + "#\n" + "# For further information, see http://www.nexusformat.org\n") +NX_CLSS = nexus.get_nx_classes() +NX_NEW_DEFINED_CLASSES = ['NX_COMPLEX'] +NX_TYPE_KEYS = nexus.get_nx_attribute_type() +NX_ATTR_IDNT = '\\@' +NX_UNIT_IDNT = 'unit' +DEPTH_SIZE = " " +NX_UNIT_TYPES = nexus.get_nx_units() +COMMENT_BLOCKS: CommentCollector +CATEGORY = '' # Definition would be either 'base' or 'application' + + +def check_for_dom_comment_in_yaml(): + """Check the yaml file has dom comment or dom comment needed to be hard coded. + """ + dignature_keyword_list = ['NeXus', + 'GNU Lesser General Public', + 'Free Software Foundation', + 'Copyright (C)', + 'WITHOUT ANY WARRANTY'] + + # Check for dom comments in first three comments + dom_comment = '' + dom_comment_ind = 1 + for ind, comnt in enumerate(COMMENT_BLOCKS[0:5]): + cmnt_list = comnt.get_comment_text() + if len(cmnt_list) == 1: + text = cmnt_list[0] + else: + continue + dom_comment = text + dom_comment_ind = ind + for keyword in dignature_keyword_list: + if keyword not in text: + dom_comment = '' + break + if dom_comment: + break + + # deactivate the root dom_comment, So that the corresponding comment would not be + # considered as comment for definition xml element. + if dom_comment: + COMMENT_BLOCKS.remove_comment(dom_comment_ind) + + return dom_comment + + +def yml_reader(inputfile): + """ + This function launches the LineLoader class. + It parses the yaml in a dict and extends it with line tag keys for each key of the dict. + """ + global COMMENT_BLOCKS + with open(inputfile, "r", encoding="utf-8") as plain_text_yaml: + loader = LineLoader(plain_text_yaml) + loaded_yaml = loader.get_single_data() + COMMENT_BLOCKS = CommentCollector(inputfile, loaded_yaml) + COMMENT_BLOCKS.extract_all_comment_blocks() + dom_cmnt_frm_yaml = check_for_dom_comment_in_yaml() + global DOM_COMMENT + if dom_cmnt_frm_yaml: + DOM_COMMENT = dom_cmnt_frm_yaml + + if 'category' not in loaded_yaml.keys(): + raise ValueError("All definitions should be either 'base' or 'application' category. " + "No category has been found.") + global CATEGORY + CATEGORY = loaded_yaml['category'] + return loaded_yaml + + +def check_for_default_attribute_and_value(xml_element): + """NeXus Groups, fields and attributes might have xml default attributes and valuesthat must + come. For example: 'optional' which is 'true' by default for base class and false otherwise. + """ + + # base:Default attributes and value for all elements of base class except dimension element + base_attr_to_val = {'optional': 'true'} + + # application: Default attributes and value for all elements of application class except + # dimension element + application_attr_to_val = {'optional': 'false'} + + # Default attributes and value for dimension element + base_dim_attr_to_val = {'required': 'false'} + application_dim_attr_to_val = {'required': 'true'} + + # Eligible tag for default attr and value + elegible_tag = ['group', 'field', 'attribute'] + + def set_default_attribute(xml_elem, default_attr_to_val): + for deflt_attr, deflt_val in default_attr_to_val.items(): + if deflt_attr not in xml_elem.attrib \ + and 'maxOccurs' not in xml_elem.attrib \ + and 'minOccurs' not in xml_elem.attrib \ + and 'recommended' not in xml_elem.attrib: + xml_elem.set(deflt_attr, deflt_val) + + for child in list(xml_element): + # skiping comment 'function' that mainly collect comment from yaml file. + if not isinstance(child.tag, str): + continue + tag = remove_namespace_from_tag(child.tag) + + if tag == 'dim' and CATEGORY == 'base': + set_default_attribute(child, base_dim_attr_to_val) + if tag == 'dim' and CATEGORY == 'application': + set_default_attribute(child, application_dim_attr_to_val) + if tag in elegible_tag and CATEGORY == 'base': + set_default_attribute(child, base_attr_to_val) + if tag in elegible_tag and CATEGORY == 'application': + + set_default_attribute(child, application_attr_to_val) + check_for_default_attribute_and_value(child) + + +def yml_reader_nolinetag(inputfile): + """ + pyyaml based parsing of yaml file in python dict + """ + with open(inputfile, 'r', encoding="utf-8") as stream: + parsed_yaml = yaml.safe_load(stream) + return parsed_yaml + + +def check_for_skiped_attributes(component, value, allowed_attr=None, verbose=False): + """ + Check for any attributes have been skipped or not. + NOTE: We should keep in mind about 'doc' + """ + block_tag = ['enumeration'] + if value: + for attr, val in value.items(): + if attr in ['doc']: + continue + if '__line__' in attr or attr in block_tag: + continue + line_number = f'__line__{attr}' + if verbose: + print(f"__line__ : {value[line_number]}") + if not isinstance(val, dict) \ + and '\\@' not in attr\ + and attr not in allowed_attr\ + and 'NX' not in attr and val: + + raise ValueError(f"An attribute '{attr}' in part '{component}' has been found" + f". Please check arround line '{value[line_number]}. At this " + f"moment. The allowed attrbutes are {allowed_attr}") + + +def format_nxdl_doc(string): + """NeXus format for doc string + """ + string = check_for_mapping_char_other(string) + formatted_doc = '' + if "\n" not in string: + if len(string) > 80: + wrapped = textwrap.TextWrapper(width=80, + break_long_words=False, + replace_whitespace=False) + string = '\n'.join(wrapped.wrap(string)) + formatted_doc = '\n' + f"{string}" + else: + text_lines = string.split('\n') + text_lines = cleaning_empty_lines(text_lines) + formatted_doc += "\n" + "\n".join(text_lines) + if not formatted_doc.endswith("\n"): + formatted_doc += "\n" + return formatted_doc + + +def check_for_mapping_char_other(text): + """ + Check for mapping char \':\' which does not be passed through yaml library. + Then replace it by ':'. + """ + if not text: + text = '' + text = str(text) + if text == 'True': + text = 'true' + if text == 'False': + text = 'false' + # Some escape char is not valid in yaml libray which is written while writting + # yaml file. In the time of writting nxdl revert to that escape char. + escape_reverter = get_yaml_escape_char_reverter_dict() + for key, val in escape_reverter.items(): + if key in text: + text = text.replace(key, val) + return str(text).strip() + + +def xml_handle_doc(obj, value: str, + line_number=None, line_loc=None): + """This function creates a 'doc' element instance, and appends it to an existing element + + """ + # global comment_bolcks + doc_elemt = ET.SubElement(obj, 'doc') + text = format_nxdl_doc(check_for_mapping_char_other(value)).strip() + # To keep the doc middle of doc tag. + doc_elemt.text = f"\n{text}\n" + if line_loc is not None and line_number is not None: + xml_handle_comment(obj, line_number, + line_loc, doc_elemt) + + +def xml_handle_units(obj, value): + """This function creates a 'units' element instance, and appends it to an existing element + + """ + obj.set('units', str(value)) + + +# pylint: disable=too-many-branches +def xml_handle_exists(dct, obj, keyword, value): + """ + This function creates an 'exists' element instance, and appends it to an existing element + """ + line_number = f'__line__{keyword}' + assert value is not None, f'Line {dct[line_number]}: exists argument must not be None !' + if isinstance(value, list): + if len(value) == 4 and value[0] == 'min' and value[2] == 'max': + obj.set('minOccurs', str(value[1])) + if str(value[3]) != 'infty': + obj.set('maxOccurs', str(value[3])) + else: + obj.set('maxOccurs', 'unbounded') + elif len(value) == 2 and value[0] == 'min': + obj.set('minOccurs', str(value[1])) + elif len(value) == 2 and value[0] == 'max': + obj.set('maxOccurs', str(value[1])) + elif len(value) == 4 and value[0] == 'max' and value[2] == 'min': + obj.set('minOccurs', str(value[3])) + if str(value[1]) != 'infty': + obj.set('maxOccurs', str(value[3])) + else: + obj.set('maxOccurs', 'unbounded') + elif len(value) == 4 and (value[0] != 'min' or value[2] != 'max'): + raise ValueError(f'Line {dct[line_number]}: exists keyword' + f'needs to go either with an optional [recommended] list with two ' + f'entries either [min, ] or [max, ], or a list of four ' + f'entries [min, , max, ] !') + else: + raise ValueError(f'Line {dct[line_number]}: exists keyword ' + f'needs to go either with optional, recommended, a list with two ' + f'entries either [min, ] or [max, ], or a list of four ' + f'entries [min, , max, ] !') + else: + # This clause take optional in all concept except dimension where 'required' key is allowed + # not the 'optional' key. + if value == 'optional': + obj.set('optional', 'true') + elif value == 'recommended': + obj.set('recommended', 'true') + elif value == 'required': + obj.set('optional', 'false') + else: + obj.set('minOccurs', '0') + + +# pylint: disable=too-many-branches, too-many-locals, too-many-statements +def xml_handle_group(dct, obj, keyword, value, verbose=False): + """ + The function deals with group instances + """ + line_number = f'__line__{keyword}' + line_loc = dct[line_number] + xml_handle_comment(obj, line_number, line_loc) + list_of_attr = ['name', 'type', 'nameType', 'deprecated', 'optional', 'recommended', + 'exists', 'unit'] + l_bracket = -1 + r_bracket = -1 + if keyword.count('(') == 1: + l_bracket = keyword.index('(') + if keyword.count(')') == 1: + r_bracket = keyword.index(')') + + keyword_name, keyword_type = nx_name_type_resolving(keyword) + if not keyword_name and not keyword_type: + raise ValueError("A group must have both value and name. Check for group.") + grp = ET.SubElement(obj, 'group') + + if l_bracket == 0 and r_bracket > 0: + grp.set('type', keyword_type) + if keyword_name: + grp.set('name', keyword_name) + elif l_bracket > 0: + grp.set('name', keyword_name) + if keyword_type: + grp.set('type', keyword_type) + else: + grp.set('name', keyword_name) + + if value: + rm_key_list = [] + for attr, vval in value.items(): + if '__line__' in attr: + continue + line_number = f"__line__{attr}" + line_loc = value[line_number] + if attr == 'doc': + xml_handle_doc(grp, vval, line_number, line_loc) + rm_key_list.append(attr) + rm_key_list.append(line_number) + elif attr == 'exists' and vval: + xml_handle_exists(value, grp, attr, vval) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, + line_number, line_loc, grp) + elif attr == 'unit': + xml_handle_units(grp, vval) + xml_handle_comment(obj, line_number, line_loc, grp) + elif attr in list_of_attr and not isinstance(vval, dict) and vval: + validate_field_attribute_and_value(attr, vval, list_of_attr, value) + grp.set(attr, check_for_mapping_char_other(vval)) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, grp) + + for key in rm_key_list: + del value[key] + # Check for skipped attrinutes + check_for_skiped_attributes('group', value, list_of_attr, verbose) + if isinstance(value, dict) and value != {}: + recursive_build(grp, value, verbose) + + +def xml_handle_dimensions(dct, obj, keyword, value: dict): + """ + This function creates a 'dimensions' element instance, and appends it to an existing element + + NOTE: we could create xml_handle_dim() function. + But, the dim elements in yaml file is defined as 'dim =[[index, value]]' + but dim has other attributes such as 'ref' and also might have doc as chlid. + so in that sense 'dim' should have come as dict keeping attributes and child as members of + dict. + Regarding this situation all the attributes of 'dimensions' and child 'doc' has been + included here. + + Other attributes, except 'index' and 'value', of 'dim' comes under nested dict named + 'dim_parameter: + incr:[...]' + """ + + possible_dimension_attrs = ['rank'] # nxdl attributes + line_number = f'__line__{keyword}' + line_loc = dct[line_number] + assert 'dim' in value.keys(), (f"Line {line_loc}: No dim as child of dimension has " + f"been found.") + xml_handle_comment(obj, line_number, line_loc) + dims = ET.SubElement(obj, 'dimensions') + # Consider all the childs under dimension is dim element and + # its attributes + + rm_key_list = [] + rank = '' + for key, val in value.items(): + if '__line__' in key: + continue + line_number = f"__line__{key}" + line_loc = value[line_number] + if key == 'rank': + rank = val or '' + if isinstance(rank, int) and rank < 0: + raise ValueError(f"Dimension must have some info about rank which is not " + f"available. Please check arround Line: {dct[line_number]}") + dims.set(key, str(val)) + rm_key_list.append(key) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, dims) + # Check dimension doc and handle it + elif key == 'doc' and isinstance(val, str): + xml_handle_doc(dims, val, line_number, line_loc) + rm_key_list.append(key) + rm_key_list.append(line_number) + elif key in possible_dimension_attrs and not isinstance(val, dict): + dims.set(key, str(val)) + rm_key_list.append(key) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, dims) + + for key in rm_key_list: + del value[key] + + xml_handle_dim_from_dimension_dict(dct, dims, keyword, value, rank=False) + + if isinstance(value, dict) and value != {}: + recursive_build(dims, value, verbose=None) + + +# pylint: disable=too-many-locals, too-many-arguments +def xml_handle_dim_from_dimension_dict(dct, dims_obj, keyword, value, rank, verbose=False): + """ + Handling dim element. + NOTE: The inputs 'keyword' and 'value' are as input for xml_handle_dimensions + function. please also read note in xml_handle_dimensions. + """ + + possible_dim_attrs = ['ref', 'incr', 'refindex', 'required'] + + # Some attributes might have equivalent name e.g. 'required' is correct one and + # 'optional' could be another name. Then change attribute to the correct one. + wrong_to_correct_attr = [('optional', 'required')] + header_line_number = f"__line__{keyword}" + dim_list = [] + rm_key_list = [] + # NOTE: dim doc and other attributes except 'index' and 'value' will come as list of value + # under dim_parameters + if not value: + return + rank = '' + # pylint: disable=too-many-nested-blocks + for attr, vvalue in value.items(): + if '__line__' in attr: + continue + line_number = f"__line__{attr}" + line_loc = value[line_number] + # dim comes in precedence + if attr == 'dim': + # dim consists of list of [index, value] + llist_ind_value = vvalue + assert isinstance(llist_ind_value, list), (f'Line {value[line_number]}: dim' + f'argument not a list !') + xml_handle_comment(dims_obj, line_number, line_loc) + if isinstance(rank, int) and rank > 0: + assert rank == len(llist_ind_value), ( + f"Wrong dimension rank check around Line {dct[header_line_number]}.\n" + f"Line {[dct[header_line_number]]} rank value {rank} " + f"is not the same as dim array = " + f"{len(llist_ind_value)}.") + # Taking care of ind and value that comes as list of list + for dim_ind_val in llist_ind_value: + dim = ET.SubElement(dims_obj, 'dim') + + # Taking care of multidimensions or rank + if len(dim_ind_val) >= 1 and dim_ind_val[0]: + dim.set('index', str(dim_ind_val[0])) + if len(dim_ind_val) == 2 and dim_ind_val[1]: + dim.set('value', str(dim_ind_val[1])) + dim_list.append(dim) + rm_key_list.append(attr) + rm_key_list.append(line_number) + elif attr == 'dim_parameters' and isinstance(vvalue, dict): + xml_handle_comment(dims_obj, line_number, line_loc) + for kkkey, vvval in vvalue.items(): + if '__line__' in kkkey: + continue + cmnt_number = f'__line__{kkkey}' + cmnt_loc = vvalue[cmnt_number] + # Check whether any optional attributes added + for tuple_wng_crt in wrong_to_correct_attr: + if kkkey == tuple_wng_crt[0]: + raise ValueError(f"{cmnt_loc}: Attribute '{kkkey}' is prohibited, use " + f"'{tuple_wng_crt[1]}") + if kkkey == 'doc' and dim_list: + # doc comes as list of doc + for i, dim in enumerate(dim_list): + if isinstance(vvval, list) and i < len(vvval): + tmp_val = vvval[i] + xml_handle_doc(dim, vvval[i], cmnt_number, cmnt_loc) + # Check all the dim have doc if not skip + elif isinstance(vvval, list) and i >= len(vvval): + pass + else: + for i, dim in enumerate(dim_list): + # all atribute of dims comes as list + if isinstance(vvval, list) and i < len(vvval): + tmp_val = vvval[i] + dim.set(kkkey, str(tmp_val)) + + # Check all the dim have doc if not skip + elif isinstance(vvval, list) and i >= len(vvval): + pass + # All dim might have the same value for the same attribute + elif not isinstance(vvval, list): + tmp_val = value + dim.set(kkkey, str(tmp_val)) + rm_key_list.append(attr) + rm_key_list.append(line_number) + else: + raise ValueError(f"Got unexpected block except 'dim' and 'dim_parameters'." + f"Please check arround line {line_number}") + + for key in rm_key_list: + del value[key] + + check_for_skiped_attributes('dim', value, possible_dim_attrs, verbose) + + +def xml_handle_enumeration(dct, obj, keyword, value, verbose): + """This function creates an 'enumeration' element instance. + + Two cases are handled: + 1) the items are in a list + 2) the items are dictionaries and may contain a nested doc + """ + line_number = f'__line__{keyword}' + line_loc = dct[line_number] + xml_handle_comment(obj, line_number, line_loc) + enum = ET.SubElement(obj, 'enumeration') + + assert value is not None, f'Line {line_loc}: enumeration must \ +bear at least an argument !' + assert len( + value) >= 1, f'Line {dct[line_number]}: enumeration must not be an empty list!' + if isinstance(value, list): + for element in value: + itm = ET.SubElement(enum, 'item') + itm.set('value', str(element)) + if isinstance(value, dict) and value != {}: + for element in value.keys(): + if '__line__' not in element: + itm = ET.SubElement(enum, 'item') + itm.set('value', str(element)) + if isinstance(value[element], dict): + recursive_build(itm, value[element], verbose) + + +# pylint: disable=unused-argument +def xml_handle_link(dct, obj, keyword, value, verbose): + """ + If we have an NXDL link we decode the name attribute from (link)[:-6] + """ + + line_number = f"__line__{keyword}" + line_loc = dct[line_number] + xml_handle_comment(obj, line_number, line_loc) + possible_attrs = ['name', 'target', 'napimount'] + name = keyword[:-6] + link_obj = ET.SubElement(obj, 'link') + link_obj.set('name', str(name)) + + if value: + rm_key_list = [] + for attr, vval in value.items(): + if '__line__' in attr: + continue + line_number = f"__line__{attr}" + line_loc = value[line_number] + if attr == 'doc': + xml_handle_doc(link_obj, vval, line_number, line_loc) + rm_key_list.append(attr) + rm_key_list.append(line_number) + elif attr in possible_attrs and not isinstance(vval, dict): + if vval: + link_obj.set(attr, str(vval)) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, link_obj) + + for key in rm_key_list: + del value[key] + # Check for skipped attrinutes + check_for_skiped_attributes('link', value, possible_attrs, verbose) + + if isinstance(value, dict) and value != {}: + recursive_build(link_obj, value, verbose=None) + + +def xml_handle_choice(dct, obj, keyword, value, verbose=False): + """ + Build choice xml elements. That consists of groups. + """ + line_number = f'__line__{keyword}' + line_loc = dct[line_number] + xml_handle_comment(obj, line_number, line_loc) + # Add attributes in possible if new attributs have been added nexus definition. + possible_attr = [] + choice_obj = ET.SubElement(obj, 'choice') + # take care of special attributes + name = keyword[:-8] + choice_obj.set('name', name) + + if value: + rm_key_list = [] + for attr, vval in value.items(): + if '__line__' in attr: + continue + line_number = f"__line__{attr}" + line_loc = value[line_number] + if attr == 'doc': + xml_handle_doc(choice_obj, vval, line_number, line_loc) + rm_key_list.append(attr) + rm_key_list.append(line_number) + elif attr in possible_attr and not isinstance(vval, dict): + if vval: + choice_obj.set(attr, str(vval)) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, choice_obj) + + for key in rm_key_list: + del value[key] + # Check for skipped attrinutes + check_for_skiped_attributes('choice', value, possible_attr, verbose) + + if isinstance(value, dict) and value != {}: + recursive_build(choice_obj, value, verbose=None) + + +def xml_handle_symbols(dct, obj, keyword, value: dict): + """Handle a set of NXDL symbols as a child to obj + + """ + line_number = f'__line__{keyword}' + line_loc = dct[line_number] + assert len(list(value.keys()) + ) >= 1, f'Line {line_loc}: symbols table must not be empty !' + xml_handle_comment(obj, line_number, line_loc) + syms = ET.SubElement(obj, 'symbols') + if 'doc' in value.keys(): + line_number = '__line__doc' + line_loc = value[line_number] + xml_handle_comment(syms, line_number, line_loc) + doctag = ET.SubElement(syms, 'doc') + doctag.text = '\n' + textwrap.fill(value['doc'], width=70) + '\n' + rm_key_list = [] + for kkeyword, vvalue in value.items(): + if '__line__' in kkeyword: + continue + if kkeyword != 'doc': + line_number = f'__line__{kkeyword}' + line_loc = value[line_number] + xml_handle_comment(syms, line_number, line_loc) + assert vvalue is not None and isinstance( + vvalue, str), f'Line {line_loc}: put a comment in doc string !' + sym = ET.SubElement(syms, 'symbol') + sym.set('name', str(kkeyword)) + # sym_doc = ET.SubElement(sym, 'doc') + xml_handle_doc(sym, vvalue) + rm_key_list.append(kkeyword) + rm_key_list.append(line_number) + # sym_doc.text = '\n' + textwrap.fill(vvalue, width=70) + '\n' + for key in rm_key_list: + del value[key] + + +def check_keyword_variable(verbose, dct, keyword, value): + """ + Check whether both keyword_name and keyword_type are empty, + and complains if it is the case + """ + keyword_name, keyword_type = nx_name_type_resolving(keyword) + if verbose: + sys.stdout.write( + f'{keyword_name}({keyword_type}): value type is {type(value)}\n') + if keyword_name == '' and keyword_type == '': + line_number = f'__line__{keyword}' + raise ValueError(f'Line {dct[line_number]}: found an improper yaml key !') + + +def helper_keyword_type(kkeyword_type): + """ + This function is returning a value of keyword_type if it belong to NX_TYPE_KEYS + """ + if kkeyword_type in NX_TYPE_KEYS: + return kkeyword_type + return None + + +def verbose_flag(verbose, keyword, value): + """ + Verbose stdout printing for nested levels of yaml file, if verbose flag is active + """ + if verbose: + sys.stdout.write(f' key:{keyword}; value type is {type(value)}\n') + + +def xml_handle_attributes(dct, obj, keyword, value, verbose): + """Handle the attributes found connected to attribute field""" + + line_number = f"__line__{keyword}" + line_loc = dct[line_number] + xml_handle_comment(obj, line_number, line_loc) + # list of possible attribute of xml attribute elementsa + attr_attr_list = ['name', 'type', 'unit', 'nameType', + 'optional', 'recommended', 'minOccurs', + 'maxOccurs', 'deprecated', 'exists'] + # as an attribute identifier + keyword_name, keyword_typ = nx_name_type_resolving(keyword) + line_number = f'__line__{keyword}' + if verbose: + print(f"__line__ : {dct[line_number]}") + if keyword_name == '' and keyword_typ == '': + raise ValueError(f'Line {dct[line_number]}: found an improper yaml key !') + elemt_obj = ET.SubElement(obj, 'attribute') + elemt_obj.set('name', keyword_name[2:]) + if keyword_typ: + elemt_obj.set('type', keyword_typ) + + rm_key_list = [] + if value and value: + # taking care of attributes of attributes + for attr, attr_val in value.items(): + if '__line__' in attr: + continue + line_number = f"__line__{attr}" + line_loc = value[line_number] + if attr in ['doc', *attr_attr_list] and not isinstance(attr_val, dict): + if attr == 'unit': + elemt_obj.set(f"{attr}s", str(value[attr])) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, elemt_obj) + elif attr == 'exists' and attr_val: + xml_handle_exists(value, elemt_obj, attr, attr_val) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, elemt_obj) + elif attr == 'doc': + xml_handle_doc(elemt_obj, format_nxdl_doc(attr_val), + line_number, line_loc) + rm_key_list.append(attr) + rm_key_list.append(line_number) + else: + elemt_obj.set(attr, check_for_mapping_char_other(attr_val)) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, line_number, line_loc, elemt_obj) + + for key in rm_key_list: + del value[key] + # Check cor skiped attribute + check_for_skiped_attributes('Attribute', value, attr_attr_list, verbose) + if value: + recursive_build(elemt_obj, value, verbose) + + +def validate_field_attribute_and_value(v_attr, vval, allowed_attribute, value): + """ + Check for any attributes that comes with invalid name, + and invalid value. + """ + + # check for empty val + if (not isinstance(vval, dict) + and not str(vval)): # check for empty value + + line_number = f"__line__{v_attr}" + raise ValueError(f"In a field a valid attrbute ('{v_attr}') found that is not stored." + f" Please check arround line {value[line_number]}") + + # The bellow elements might come as child element + skipped_child_name = ['doc', 'dimension', 'enumeration', 'choice', 'exists'] + # check for invalid key or attributes + if (v_attr not in [*skipped_child_name, *allowed_attribute] + and '__line__' not in v_attr + and not isinstance(vval, dict) + and '(' not in v_attr # skip only groups and field that has name and type + and '\\@' not in v_attr): # skip nexus attributes + + line_number = f"__line__{v_attr}" + raise ValueError(f"In a field or group a invalid attribute ('{v_attr}') or child has found." + f" Please check arround line {value[line_number]}.") + + +def xml_handle_fields(obj, keyword, value, line_annot, line_loc, verbose=False): + """ + Handle a field in yaml file. + When a keyword is NOT: + symbol, + NX baseclass member, + attribute (\\@), + doc, + enumerations, + dimension, + exists, + then the not empty keyword_name is a field! + This simple function will define a new node of xml tree + """ + # List of possible attributes of xml elements + allowed_attr = ['name', 'type', 'nameType', 'unit', 'minOccurs', 'long_name', + 'axis', 'signal', 'deprecated', 'axes', 'exists', + 'data_offset', 'interpretation', 'maxOccurs', + 'primary', 'recommended', 'optional', 'stride'] + + xml_handle_comment(obj, line_annot, line_loc) + l_bracket = -1 + r_bracket = -1 + if keyword.count('(') == 1: + l_bracket = keyword.index('(') + if keyword.count(')') == 1: + r_bracket = keyword.index(')') + + keyword_name, keyword_type = nx_name_type_resolving(keyword) + if not keyword_type and not keyword_name: + raise ValueError("Check for name or type in field.") + elemt_obj = ET.SubElement(obj, 'field') + + # type come first + if l_bracket == 0 and r_bracket > 0: + elemt_obj.set('type', keyword_type) + if keyword_name: + elemt_obj.set('name', keyword_name) + elif l_bracket > 0: + elemt_obj.set('name', keyword_name) + if keyword_type: + elemt_obj.set('type', keyword_type) + else: + elemt_obj.set('name', keyword_name) + + if value: + rm_key_list = [] + # In each each if clause apply xml_handle_comment(), to collect + # comments on that yaml line. + for attr, vval in value.items(): + if '__line__' in attr: + continue + line_number = f"__line__{attr}" + line_loc = value[line_number] + if attr == 'doc': + xml_handle_doc(elemt_obj, vval, line_number, line_loc,) + rm_key_list.append(attr) + rm_key_list.append(line_number) + elif attr == 'exists' and vval: + xml_handle_exists(value, elemt_obj, attr, vval) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, + line_number, + line_loc, elemt_obj) + elif attr == 'unit': + xml_handle_units(elemt_obj, vval) + xml_handle_comment(obj, + line_number, + line_loc, elemt_obj) + elif attr in allowed_attr and not isinstance(vval, dict) and vval: + validate_field_attribute_and_value(attr, vval, allowed_attr, value) + elemt_obj.set(attr, check_for_mapping_char_other(vval)) + rm_key_list.append(attr) + rm_key_list.append(line_number) + xml_handle_comment(obj, + line_number, + line_loc, elemt_obj) + + for key in rm_key_list: + del value[key] + # Check for skipped attrinutes + check_for_skiped_attributes('field', value, allowed_attr, verbose) + + if isinstance(value, dict) and value != {}: + recursive_build(elemt_obj, value, verbose) + + +def xml_handle_comment(obj: ET.Element, + line_annotation: str, + line_loc_no: int, + xml_ele: ET.Element = None, + is_def_cmnt: bool = False): + """ + Add xml comment: check for comments that has the same 'line_annotation' + (e.g. __line__data) and the same line_loc_no (e.g. 30). After that, i + does of three tasks: + 1. Returns list of comments texts (multiple members if element has multiple comments) + 2. Rearrange comment element and xml_ele where comment comes first. + 3. Append comment element when no xml_ele will no be provided. + """ + + line_info = (line_annotation, int(line_loc_no)) + if line_info in COMMENT_BLOCKS: + cmnt = COMMENT_BLOCKS.get_coment_by_line_info(line_info) + cmnt_text = cmnt.get_comment_text() + + if is_def_cmnt: + return cmnt_text + if xml_ele is not None: + obj.remove(xml_ele) + for string in cmnt_text: + si_comnt = ET.Comment(string) + obj.append(si_comnt) + obj.append(xml_ele) + elif not is_def_cmnt and xml_ele is None: + for string in cmnt_text: + si_comnt = ET.Comment(string) + obj.append(si_comnt) + else: + raise ValueError("Provied correct parameter values.") + return '' + + +def recursive_build(obj, dct, verbose): + """obj is the current node of the XML tree where we want to append to, + dct is a dictionary object which represents the content of a child to obj + dct may contain further dictionary nests, representing NXDL groups, + which trigger recursive processing + NXDL fields may contain attributes but trigger no recursion so attributes are leafs. + + """ + for keyword, value in iter(dct.items()): + if '__line__' in keyword: + continue + line_number = f"__line__{keyword}" + line_loc = dct[line_number] + keyword_name, keyword_type = nx_name_type_resolving(keyword) + check_keyword_variable(verbose, dct, keyword, value) + if verbose: + sys.stdout.write( + f'keyword_name:{keyword_name} keyword_type {keyword_type}\n') + + if keyword[-6:] == '(link)': + xml_handle_link(dct, obj, keyword, value, verbose) + elif keyword[-8:] == '(choice)': + xml_handle_choice(dct, obj, keyword, value) + # The bellow xml_symbol clause is for the symbols that come ubde filed or attributes + # Root level symbols has been inside nyaml2nxdl() + elif keyword_type == '' and keyword_name == 'symbols': + xml_handle_symbols(dct, obj, keyword, value) + + elif ((keyword_type in NX_CLSS) or (keyword_type not in + [*NX_TYPE_KEYS, '', *NX_NEW_DEFINED_CLASSES])): + # we can be sure we need to instantiate a new group + xml_handle_group(dct, obj, keyword, value, verbose) + + elif keyword_name[0:2] == NX_ATTR_IDNT: # check if obj qualifies + xml_handle_attributes(dct, obj, keyword, value, verbose) + elif keyword == 'doc': + xml_handle_doc(obj, value, line_number, line_loc) + elif keyword == NX_UNIT_IDNT: + xml_handle_units(obj, value) + elif keyword == 'enumeration': + xml_handle_enumeration(dct, obj, keyword, value, verbose) + + elif keyword == 'dimensions': + xml_handle_dimensions(dct, obj, keyword, value) + + elif keyword == 'exists': + xml_handle_exists(dct, obj, keyword, value) + # Handles fileds e.g. AXISNAME + elif keyword_name != '' and '__line__' not in keyword_name: + xml_handle_fields(obj, keyword, + value, line_number, + line_loc, verbose) + else: + raise ValueError(f"An unfamiliar type of element {keyword} has been found which is " + f"not be able to be resolved. Chekc arround line {dct[line_number]}") + + +def pretty_print_xml(xml_root, output_xml, def_comments=None): + """ + Print better human-readable indented and formatted xml file using + built-in libraries and preceding XML processing instruction + """ + dom = minidom.parseString(ET.tostring( + xml_root, encoding='utf-8', method='xml')) + proc_instractionn = dom.createProcessingInstruction( + 'xml-stylesheet', 'type="text/xsl" href="nxdlformat.xsl"') + dom_comment = dom.createComment(DOM_COMMENT) + root = dom.firstChild + dom.insertBefore(proc_instractionn, root) + dom.insertBefore(dom_comment, root) + + if def_comments: + for string in def_comments: + def_comt_ele = dom.createComment(string) + dom.insertBefore(def_comt_ele, root) + + xml_string = dom.toprettyxml(indent=1 * DEPTH_SIZE, newl='\n', encoding='UTF-8') + with open('tmp.xml', "wb") as file_tmp: + file_tmp.write(xml_string) + flag = False + with open('tmp.xml', "r", encoding="utf-8") as file_out: + with open(output_xml, "w", encoding="utf-8") as file_out_mod: + for i in file_out.readlines(): + if '' not in i and '' not in i and flag is False: + file_out_mod.write(i) + elif '' in i and '' in i: + file_out_mod.write(i) + elif '' in i and '' not in i: + flag = True + white_spaces = len(i) - len(i.lstrip()) + file_out_mod.write(i) + elif '' not in i and '' not in i and flag is True: + file_out_mod.write((white_spaces + 5) * ' ' + i) + elif '' not in i and '' in i and flag is True: + file_out_mod.write(white_spaces * ' ' + i) + flag = False + os.remove('tmp.xml') + + +# pylint: disable=too-many-statements +def nyaml2nxdl(input_file: str, out_file, verbose: bool): + """ + Main of the nyaml2nxdl converter, creates XML tree, namespace and + schema, definitions then evaluates a dictionary nest of groups recursively and + fields or (their) attributes as childs of the groups + """ + + def_attributes = ['deprecated', 'ignoreExtraGroups', 'category', 'type', + 'ignoreExtraFields', 'ignoreExtraAttributes', 'restricts'] + yml_appdef = yml_reader(input_file) + def_cmnt_text = [] + if verbose: + sys.stdout.write(f'input-file: {input_file}\n') + sys.stdout.write('application/base contains the following root-level entries:\n') + sys.stdout.write(str(yml_appdef.keys())) + xml_root = ET.Element('definition', {}) + assert 'category' in yml_appdef.keys( + ), 'Required root-level keyword category is missing!' + assert yml_appdef['category'] in ['application', 'base'], 'Only \ +application and base are valid categories!' + assert 'doc' in yml_appdef.keys(), 'Required root-level keyword doc is missing!' + + name_extends = '' + yml_appdef_copy = yml_appdef.copy() + for kkey, vvalue in yml_appdef_copy.items(): + if '__line__' in kkey: + continue + line_number = f"__line__{kkey}" + line_loc_no = yml_appdef[line_number] + if not isinstance(vvalue, dict) and kkey in def_attributes: + xml_root.set(kkey, str(vvalue) or '') + cmnt_text = xml_handle_comment(xml_root, + line_number, line_loc_no, + is_def_cmnt=True) + def_cmnt_text += cmnt_text if cmnt_text else [] + + del yml_appdef[line_number] + del yml_appdef[kkey] + # Taking care or name and extends + elif 'NX' in kkey: + # Tacking the attribute order but the correct value will be stored later + # check for name first or type first if (NXobject)NXname then type first + l_bracket_ind = kkey.rfind('(') + r_bracket_ind = kkey.rfind(')') + if l_bracket_ind == 0: + extend = kkey[1:r_bracket_ind] + name = kkey[r_bracket_ind + 1:] + xml_root.set('extends', extend) + xml_root.set('name', name) + elif l_bracket_ind > 0: + name = kkey[0:l_bracket_ind] + extend = kkey[l_bracket_ind + 1: r_bracket_ind] + xml_root.set('name', name) + xml_root.set('extends', extend) + else: + name = kkey + xml_root.set('name', name) + xml_root.set('extends', 'NXobject') + cmnt_text = xml_handle_comment(xml_root, + line_number, line_loc_no, + is_def_cmnt=True) + def_cmnt_text += cmnt_text if cmnt_text else [] + + name_extends = kkey + + if 'type' not in xml_root.attrib: + xml_root.set('type', "group") + # Taking care of namespaces + namespaces = {'xmlns': 'http://definition.nexusformat.org/nxdl/3.1', + 'xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance', + 'xsi:schemaLocation': 'http://definition.nexusformat.org/nxdl/3.1 ../nxdl.xsd'} + for key, ns_ in namespaces.items(): + xml_root.attrib[key] = ns_ + # Taking care of Symbols elements + if 'symbols' in yml_appdef.keys(): + xml_handle_symbols(yml_appdef, + xml_root, + 'symbols', + yml_appdef['symbols']) + + del yml_appdef['symbols'] + del yml_appdef["__line__symbols"] + + assert isinstance(yml_appdef['doc'], str) and yml_appdef['doc'] != '', 'Doc \ +has to be a non-empty string!' + + line_number = '__line__doc' + line_loc_no = yml_appdef[line_number] + xml_handle_doc(xml_root, yml_appdef['doc'], line_number, line_loc_no) + + del yml_appdef['doc'] + + root_keys = 0 + for key in yml_appdef.keys(): + if '__line__' not in key: + root_keys += 1 + extra_key = key + + assert root_keys == 1, (f"Accepting at most keywords: category, doc, symbols, and NX... " + f"at root-level! check key at root level {extra_key}") + + assert ('NX' in name_extends and len(name_extends) > 2), 'NX \ +keyword has an invalid pattern, or is too short!' + # Taking care if definition has empty content + if yml_appdef[name_extends]: + recursive_build(xml_root, yml_appdef[name_extends], verbose) + # Taking care of comments that comes at the end of file that is might not be intended for + # any nxdl elements. + if COMMENT_BLOCKS[-1].has_post_comment: + post_comment = COMMENT_BLOCKS[-1] + (lin_annot, line_loc) = post_comment.get_line_info() + xml_handle_comment(xml_root, lin_annot, line_loc) + + # Note: Just to keep the functionality if we need this functionality later. + default_attr = False + if default_attr: + check_for_default_attribute_and_value(xml_root) + pretty_print_xml(xml_root, out_file, def_cmnt_text) + if verbose: + sys.stdout.write('Parsed YAML to NXDL successfully\n') diff --git a/dev_tools/nyaml2nxdl/nyaml2nxdl_helper.py b/dev_tools/nyaml2nxdl/nyaml2nxdl_helper.py new file mode 100644 index 0000000000..58d634c9d8 --- /dev/null +++ b/dev_tools/nyaml2nxdl/nyaml2nxdl_helper.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +"""Main file of yaml2nxdl tool. +Users create NeXus instances by writing a YAML file +which details a hierarchy of data/metadata elements + +""" +# -*- coding: utf-8 -*- +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# Yaml library does not except the keys (escapechar "\t" and yaml separator ":") +# So the corresponding value is to skip them and +# and also carefull about this order +import hashlib +from yaml.composer import Composer +from yaml.constructor import Constructor + +from yaml.nodes import ScalarNode +from yaml.resolver import BaseResolver +from yaml.loader import Loader + +# NOTE: If any one change one of the bellow dict please change it for both +ESCAPE_CHAR_DICT_IN_YAML = {"\t": " ", + "\':\'": ":"} + +ESCAPE_CHAR_DICT_IN_XML = {" ": "\t", + "\':\'": ":"} + + +def remove_namespace_from_tag(tag): + """Helper function to remove the namespace from an XML tag.""" + + return tag.split("}")[-1] + + +class LineLoader(Loader): # pylint: disable=too-many-ancestors + """ + LineLoader parses a yaml into a python dictionary extended with extra items. + The new items have as keys __line__ and as values the yaml file line number + """ + + def compose_node(self, parent, index): + # the line number where the previous token has ended (plus empty lines) + node = Composer.compose_node(self, parent, index) + node.__line__ = self.line + 1 + return node + + def construct_mapping(self, node, deep=False): + node_pair_lst = node.value + node_pair_lst_for_appending = [] + + for key_node in node_pair_lst: + shadow_key_node = ScalarNode( + tag=BaseResolver.DEFAULT_SCALAR_TAG, value='__line__' + key_node[0].value) + shadow_value_node = ScalarNode( + tag=BaseResolver.DEFAULT_SCALAR_TAG, value=key_node[0].__line__) + node_pair_lst_for_appending.append( + (shadow_key_node, shadow_value_node)) + + node.value = node_pair_lst + node_pair_lst_for_appending + return Constructor.construct_mapping(self, node, deep=deep) + + +def get_yaml_escape_char_dict(): + """Get escape char and the way to skip them in yaml.""" + return ESCAPE_CHAR_DICT_IN_YAML + + +def get_yaml_escape_char_reverter_dict(): + """To revert yaml escape char in xml constructor from yaml.""" + + return ESCAPE_CHAR_DICT_IN_XML + + +def type_check(nx_type): + """ + Check for nexus type if type is NX_CHAR get '' or get as it is. + """ + + if nx_type in ['NX_CHAR', '']: + nx_type = '' + else: + nx_type = f"({nx_type})" + return nx_type + + +def get_node_parent_info(tree, node): + """ + Return tuple of (parent, index) where: + parent = node of parent within tree + index = index of node under parent + """ + + parent_map = {c: p for p in tree.iter() for c in p} + parent = parent_map[node] + return parent, list(parent).index(node) + + +def cleaning_empty_lines(line_list): + """ + Cleaning up empty lines on top and bottom. + """ + if not isinstance(line_list, list): + line_list = line_list.split('\n') if '\n' in line_list else [''] + + # Clining up top empty lines + while True: + if line_list[0].strip(): + break + line_list = line_list[1:] + if len(line_list) == 0: + line_list.append('') + return line_list + + # Clining bottom empty lines + while True: + if line_list[-1].strip(): + break + line_list = line_list[0:-1] + if len(line_list) == 0: + line_list.append('') + return line_list + + return line_list + + +def nx_name_type_resolving(tmp): + """ + extracts the eventually custom name {optional_string} + and type {nexus_type} from a YML section string. + YML section string syntax: optional_string(nexus_type) + """ + if tmp.count('(') == 1 and tmp.count(')') == 1: + # we can safely assume that every valid YML key resolves + # either an nx_ (type, base, candidate) class contains only 1 '(' and ')' + index_start = tmp.index('(') + index_end = tmp.index(')', index_start + 1) + typ = tmp[index_start + 1:index_end] + nam = tmp.replace('(' + typ + ')', '') + return nam, typ + + # or a name for a member + typ = '' + nam = tmp + return nam, typ + + +def get_sha256_hash(file_name): + """Generate a sha256_hash for a given file. + """ + sha_hash = hashlib.sha256() + + with open(file=file_name, mode='rb',) as file_obj: + # Update hash for each 4k block of bytes + for b_line in iter(lambda: file_obj.read(4096), b""): + sha_hash.update(b_line) + return sha_hash.hexdigest() + + +def extend_yamlfile_with_comment(yaml_file, + file_to_be_appended, + top_lines_list=None): + """Extend yaml file by the file_to_be_appended as comment. + """ + + with open(yaml_file, mode='a+', encoding='utf-8') as f1_obj: + if top_lines_list: + for line in top_lines_list: + f1_obj.write(line) + + with open(file_to_be_appended, mode='r', encoding='utf-8') as f2_obj: + lines = f2_obj.readlines() + for line in lines: + f1_obj.write(f"# {line}") + + +def separate_hash_yaml_and_nxdl(yaml_file, sep_yaml, sep_xml): + """Separate the provided yaml file into yaml, nxdl and hash if yaml was extended with + nxdl at the end of yaml by + '\n# ++++++++++++++++++++++++++++++++++ SHA HASH \ + ++++++++++++++++++++++++++++++++++\n' + # ' + """ + sha_hash = '' + with open(yaml_file, 'r', encoding='utf-8') as inp_file: + lines = inp_file.readlines() + # file to write yaml part + with open(sep_yaml, 'w', encoding='utf-8') as yml_f_ob, \ + open(sep_xml, 'w', encoding='utf-8') as xml_f_ob: + + last_line = '' + write_on_yaml = True + for ind, line in enumerate(lines): + if ind == 0: + last_line = line + # Write in file when ensured that the nest line is not with '++ SHA HASH ++' + elif '++ SHA HASH ++' not in line and write_on_yaml: + yml_f_ob.write(last_line) + last_line = line + elif '++ SHA HASH ++' in line: + write_on_yaml = False + last_line = '' + elif not write_on_yaml and not last_line: + # The first line of xml file has been found. Onward write lines directly + # into xml file. + if not sha_hash: + sha_hash = line.split('# ', 1)[-1].strip() + else: + xml_f_ob.write(line[2:]) + # If the yaml fiile does not contain any hash for nxdl then we may have last line. + if last_line: + yml_f_ob.write(last_line) + + return sha_hash diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000..baa6afee73 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,43 @@ +[build-system] +requires = ["setuptools>=64.0.1", "setuptools-scm[toml]>=6.2"] +build-backend = "setuptools.build_meta" + +[project] +name = "nexusdefinitions" +dynamic = ["version"] +authors = [ + { name = "NIAC" } +] +description = "Nexus definitions" +readme = "README.md" +license = { file = "LGPL.txt" } +requires-python = "" +classifiers = [ + "Operating System :: OS Independent" +] +dependencies = [ + "lxml", + "pyyaml", + "click>=7.1.2", + "h5py>=3.6.0", + "sphinx>=5", + "sphinx-tabs", + "pytest", + "black>=22.3", + "flake8>=4", + "isort>=5.10", +] + +[project.urls] +"Homepage" = "https://nexusformat.org" + +[project.scripts] +read_nexus = "dev_tools.utils.nexus:main" +nyaml2nxdl = "dev_tools.nyaml2nxdl.nyaml2nxdl:launch_tool" + +[tools.setuptools_scm] +version_scheme = "guess-next-dev" +local_scheme = "node-and-date" + +[tool.setuptools] +packages = ["dev_tools"]