diff --git a/pyang/plugins/opc_ua.py b/pyang/plugins/opc_ua.py new file mode 100644 index 00000000..691dc583 --- /dev/null +++ b/pyang/plugins/opc_ua.py @@ -0,0 +1,332 @@ +"""opc_ua output plugin +1) Invoke with: +>pyang -f opc_ua > +""" + +from pyang import error +from pyang import plugin + + +def pyang_plugin_init(): + plugin.register_plugin(OPCUAPlugin()) + + +yang_to_opcua_data_type = { + "binary": "ByteString", + "bits": "BitFieldMaskDataType", + "boolean": "Boolean", + "decimal64": "Double", + "empty": "BaseObjectType", + "enumeration": "EnumValueType", + "int8": "SByte", + "int16": "Int16", + "int32": "Int32", + "int64": "Int64", + "string": "String", + "uint8": "Byte", + "uint16": "UInt16", + "uint32": "UInt32", + "uint64": "UInt64", + "date-and-time": "DateTime", + "union": "Union" +} + + +class OPCUAPlugin(plugin.PyangPlugin): + def add_output_format(self, fmts): + self.multiple_modules = True + fmts['opc_ua'] = self + + def pre_validate(self, ctx, modules): + module = modules[0] + self.mods = [module.arg] + [i.arg for i in module.search('include')] + + def emit(self, ctx, modules, fd): + for epos, etag, eargs in ctx.errors: + if ((epos.top is None or epos.top.arg in self.mods) and + error.is_error(error.err_level(etag))): + self.fatal("%s contains errors" % epos.top.arg) + + opc_ua_doc = OPCUAEmitter(ctx) + opc_ua_doc.emit(modules, fd) + + def fatal(self, exitCode=1): + raise error.EmitError(self, exitCode) + + +def convert_to_camel_case(text): + return ''.join(word.capitalize() for word in text.split('-')) + + +class OPCUAEmitter: + def __init__(self, ctx): + self._ctx = ctx + + def emit(self, modules, fd): + for module in modules: + self.emit_module_header(module, fd) + for s in module.substmts: + self.emit_stmt(module, s, fd) + self.emit_module_footer(module, fd) + + # print('!!!!!!!!!! Substatements !!!!!!!!!') + # for s in module.substmts: + # print(f'Statemet {s}') + # if s.keyword == 'container': + # print('!!!!!!!!!!! Substatements !!!!!!!!!!') + # for substmt in s.substmts: + # print(f'sub - {substmt}') + # if substmt.keyword == 'leaf': + # val = substmt.search_one('type') + # print(f'Type of substatement - {val}') + # print(f'Value of substatement - {val.arg}') + + def emit_stmt(self, mod, stmt, fd): + if stmt.keyword == 'container': + self.emit_container(stmt, fd, 4) + + elif stmt.keyword == 'leaf' or stmt.keyword == 'leaf-list': + self.emit_leaf(stmt, fd, 4) + + elif stmt.keyword == 'typedef': + self.emit_typedef(stmt, fd, 4) + + elif stmt.keyword == 'rpc': + self.emit_rpc(stmt, fd, 4) + + elif stmt.keyword == 'grouping': + self.emit_grouping(stmt, fd, 4) + + elif stmt.keyword == 'list': + self.emit_list(stmt, fd, 4) + + def emit_child_stmt(self, parent, node, fd, num_spaces): + if node.keyword == 'leaf' or node.keyword == 'leaf-list': + if parent.keyword == 'rpc': + self.emit_leaf(node, fd, num_spaces + 2, is_rpc=True) + else: + self.emit_leaf(node, fd, num_spaces + 2, is_rpc=False) + + elif node.keyword == 'container': + self.emit_container(node, fd, num_spaces + 2) + + elif node.keyword == 'rpc': + self.emit_rpc(node, fd, num_spaces + 2) + + elif node.keyword == 'uses': + parent_name = node.arg + self.emit_uses(node, fd, num_spaces + 2, parent_name) + + elif node.keyword == 'list': + self.emit_list(node, fd, num_spaces + 2) + + def emit_module_header(self, module, fd): + namespace = 'http://opcfoundation.org/UA/default/' + namespace_name = '' + version = '1.0' + prefix = '' + if module.search_one('namespace') is not None: + namespace = module.search_one('namespace').arg + if module.search_one('revision') is not None: + version = module.search_one('revision').arg + if module.search_one('prefix') is not None: + prefix = module.search_one('prefix').arg + + if len(namespace.split("/")) > 1: + namespace_name = namespace.split("/")[-2] + else: + namespace_name = namespace.split(":")[-1] + fd.write('\n' + '\n\n' % (namespace, namespace, version)) + + fd.write(' \n' + ' %s\n' + '%s' + ' \n\n' % ( + namespace_name, prefix, prefix, namespace, self.emit_import_include(module, fd))) + + def emit_import_include(self, module, fd): + namespace = 'http://opcfoundation.org/UA/default/' + temp_str = '' + if module.search('import') is not None or module.search('include') is not None: + for stmt in module.substmts: + if stmt.keyword == 'import' or stmt.keyword == 'include': + for s in stmt.substmts: + import_name = stmt.arg + prefix = s.arg + temp_str += f' {namespace}\n' + + return temp_str + + def emit_module_footer(self, module, fd): + fd.write('') + + def emit_container(self, node, fd, num_space): + spaces = ' ' * num_space + container_name = node.arg + fd.write( + f'{spaces}\n' + f'{" " * (num_space + 2)}\n') + + for s in node.substmts: + self.emit_child_stmt(node, s, fd, num_space + 4) + + fd.write(f'{" " * (num_space + 2)}\n' + f'{spaces}\n\n' + f'{spaces}\n' + f'{" " * (num_space + 2)}\n' + f'{" " * (num_space + 4)}\n' + f'{" " * (num_space + 6)}\n' + f'{" " * (num_space + 8)}SignalTo\n' + f'{" " * (num_space + 6)}\n' + f'{" " * (num_space + 4)}\n' + f'{" " * (num_space + 2)}\n' + f'{spaces}\n') + + def emit_leaf(self, node, fd, num_space, is_rpc=False): + spaces = ' ' * num_space + leaf_name = node.arg + leaf_type = yang_to_opcua_data_type.get(node.search_one('type').arg, 'Unknown OPC UA data type') + leaf_default_value = None + leaf_description = None + leaf_property = 'Property' + + if node.search_one('config') is not None and node.search_one('config').arg == 'true': + leaf_property = 'Variable' + + if is_rpc: + leaf_property = 'Argument' + + if node.search_one('default') is not None: + leaf_default_value = node.search_one('default').arg + + if node.search_one('description') is not None: + leaf_description = node.search_one('description').arg + + fd.write( + f'{spaces} \n') + if leaf_default_value is not None: + fd.write( + f'{spaces} {leaf_default_value} \n') + if leaf_description is not None: + fd.write(f'{spaces} {leaf_description}\n') + fd.write(f'{spaces}\n') + + def emit_rpc(self, node, fd, num_space): + spaces = ' ' * num_space + rpc_name = node.arg + fd.write( + f'\n{spaces}\n') + + for stmt in node.substmts: + if stmt.keyword == 'input': + for s in stmt.substmts: + fd.write(f'{" " * (num_space + 2)}\n') + self.emit_child_stmt(node, s, fd, num_space + 4) + fd.write(f'{" " * (num_space + 2)}\n') + + elif stmt.keyword == 'output': + for s in stmt.substmts: + fd.write(f'{" " * (num_space + 2)}\n') + self.emit_child_stmt(node, s, fd, num_space + 4) + fd.write(f'{" " * (num_space + 2)}\n') + + fd.write(f'{spaces}\n\n') + + def emit_typedef(self, node, fd, num_space): + spaces = ' ' * num_space + typedef_name = node.arg + typedef_default_value = None + typedef_description = None + typedef_type = None + + if node.search_one('default') is not None: + typedef_default_value = node.search_one('default').arg + + if node.search_one('description') is not None: + typedef_description = node.search_one('description').arg + + if node.search_one('type') is not None: + if node.search_one('type').arg == 'enumeration': + typedef_type = 'Enumeration' + fd.write( + f'{spaces} \n' + f'{spaces} \n') + + for stmt in node.substmts: + enum_names = stmt.search('enum') + k = 0 + for s in stmt.substmts: + fd.write(f'{spaces} \n') + fd.write(f'{spaces} \n' + f'{spaces}\n') + return + + else: + typedef_type = yang_to_opcua_data_type.get(node.search_one('type').arg, "Unknown OpcUa data type") + if typedef_type != 'Unknown OpcUa data type': + yang_to_opcua_data_type[typedef_type] = typedef_type + + fd.write( + f'{spaces} \n') + if typedef_default_value is not None: + fd.write( + f'{spaces} {typedef_default_value} \n') + if typedef_description is not None: + fd.write(f'{spaces} {typedef_description}\n') + fd.write(f'{spaces}\n') + + def emit_uses(self, node, fd, num_space, parent_name): + spaces = ' ' * num_space + grouping_name = node.arg + fd.write( + f'{spaces}\n' + f'{spaces}\n') + + def emit_list(self, node, fd, num_space): + spaces = ' ' * num_space + list_name = node.arg + + fd.write( + f'{spaces}\n' + f'{" " * (num_space + 2)}\n') + + for s in node.substmts: + self.emit_child_stmt(node, s, fd, num_space + 4) + + fd.write(f'{" " * (num_space + 2)}\n' + f'{spaces}\n' + f'{spaces}\n' + f'{" " * (num_space + 2)}\n' + f'{" " * (num_space + 4)}\n' + f'{" " * (num_space + 2)}\n' + f'{spaces}\n') + + def emit_grouping(self, node, fd, num_space): + spaces = ' ' * num_space + grouping_name = node.arg + fd.write( + f'{spaces}\n' + f'{" " * (num_space + 2)}\n') + + for s in node.substmts: + self.emit_child_stmt(node, s, fd, num_space + 2) + + fd.write(f'{" " * (num_space + 2)}\n' + f'{spaces}\n\n')