From c797612d606567f2836ef431b0f807a55da65bcf Mon Sep 17 00:00:00 2001 From: Clemens Vasters Date: Thu, 22 Feb 2024 17:42:46 +0100 Subject: [PATCH] XSD and better dependency resolution Signed-off-by: Clemens Vasters --- avrotize/common.py | 32 +++ avrotize/dependency_resolver.py | 45 ++-- avrotize/jsonstoavro.py | 83 +++----- avrotize/xsdtoavro.py | 352 ++++++++++++++++++++------------ 4 files changed, 302 insertions(+), 210 deletions(-) create mode 100644 avrotize/common.py diff --git a/avrotize/common.py b/avrotize/common.py new file mode 100644 index 0000000..8b94943 --- /dev/null +++ b/avrotize/common.py @@ -0,0 +1,32 @@ +import re + +def avro_name(name): + """Convert a name into an Avro name.""" + val = re.sub(r'[^a-zA-Z0-9_]', '_', name) + if re.match(r'^[0-9]', val): + val = '_' + val + return val + +def generic_type() -> list[str | dict]: + simple_type_union: list[str | dict] = ["null", "boolean", "int", "long", "float", "double", "bytes", "string"] + l2 = simple_type_union.copy() + l2.extend([ + { + "type": "array", + "items": simple_type_union + }, + { + "type": "map", + "values": simple_type_union + }]) + l1 = simple_type_union.copy() + l1.extend([ + { + "type": "array", + "items": l2 + }, + { + "type": "map", + "values": l2 + }]) + return l1 \ No newline at end of file diff --git a/avrotize/dependency_resolver.py b/avrotize/dependency_resolver.py index 2bd0fdd..5d87fba 100644 --- a/avrotize/dependency_resolver.py +++ b/avrotize/dependency_resolver.py @@ -63,10 +63,13 @@ def sort_messages_by_dependencies(avro_schema): remaining_deps = [dep for dep in record['dependencies'] if not dep in [x.get('namespace')+'.'+x.get('name') for x in sorted_messages]] if len(remaining_deps) > 0: swap_record_dependencies(avro_schema, record) - remaining_remaining_deps = [dep for dep in record['dependencies'] if not dep in [x.get('namespace')+'.'+x.get('name') for x in sorted_messages]] - found = len(remaining_deps) != len(remaining_remaining_deps) - if found: - break + if isinstance(record, dict) and not 'dependencies' in record: + found = True + else: + remaining_remaining_deps = [dep for dep in record['dependencies'] if not dep in [x.get('namespace')+'.'+x.get('name') for x in sorted_messages]] + found = len(remaining_deps) != len(remaining_remaining_deps) + if found: + break if not found: found = False @@ -92,15 +95,18 @@ def sort_messages_by_dependencies(avro_schema): return sorted_messages def swap_record_dependencies(avro_schema, record): - for dependency in record.get('dependencies', []): - dependency_type = next((x for x in avro_schema if x['name'] == dependency or x.get('namespace','')+'.'+x['name'] == dependency), None) - if not dependency_type: - continue - deps = record.get('dependencies', []) - for field in record['fields']: - if record['name'] != dependency and (record.get('namespace','')+'.'+record['name']) != dependency: - swap_dependency_type(avro_schema, field, dependency, dependency_type, deps) - record['dependencies'] = [dep for dep in deps if dep != record['name'] and record.get('namespace','')+'.'+record['name'] != dep] + while 'dependencies' in record and len(record['dependencies']) > 0: + for dependency in record.get('dependencies', []): + dependency_type = next((x for x in avro_schema if x['name'] == dependency or x.get('namespace','')+'.'+x['name'] == dependency), None) + if not dependency_type and dependency in record['dependencies']: + record['dependencies'].remove(dependency) + deps = record.get('dependencies', []) + for field in record['fields']: + if record['name'] != dependency and (record.get('namespace','')+'.'+record['name']) != dependency: + swap_dependency_type(avro_schema, field, dependency, dependency_type, deps) + record['dependencies'] = [dep for dep in deps if dep != record['name'] and record.get('namespace','')+'.'+record['name'] != dep] + if len(record['dependencies']) == 0: + del record['dependencies'] def swap_dependency_type(avro_schema, field, dependency, dependency_type, dependencies): @@ -119,7 +125,7 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend dependencies.extend(dependency_type.get('dependencies', [])) if 'dependencies' in dependency_type: swap_record_dependencies(avro_schema, dependency_type) - del dependency_type['dependencies'] + # type is a Union? elif isinstance(field['type'], list): for field_type in field['type']: @@ -132,12 +138,17 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend dependencies.extend(dependency_type.get('dependencies', [])) if 'dependencies' in dependency_type: swap_record_dependencies(avro_schema, dependency_type) - del dependency_type['dependencies'] # type is an object? elif isinstance(field_type, dict) and 'type' in field_type and field_type.get('type') == dependency or \ 'items' in field_type and field_type.get('items') == dependency or \ 'values' in field_type and field_type.get('values') == dependency: swap_dependency_type(avro_schema, field_type, dependency, dependency_type, dependencies) + elif isinstance(field_type, list): + for item in field_type: + if item == dependency or isinstance(item, dict) and item.get('type') == dependency: + swap_dependency_type(avro_schema, field_type, dependency, dependency_type, dependencies) + elif isinstance(field_type, dict) and 'type' in field_type: + swap_dependency_type(avro_schema, field_type, dependency, dependency_type, dependencies) elif isinstance(field['type'], dict) and 'type' in field['type']: swap_dependency_type(avro_schema, field['type'], dependency, dependency_type, dependencies) elif field['type'] == 'array': @@ -152,7 +163,6 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend dependencies.extend(dependency_type.get('dependencies', [])) if 'dependencies' in dependency_type: swap_record_dependencies(avro_schema, dependency_type) - del dependency_type['dependencies'] elif isinstance(item, dict) and 'type' in item and item.get('type') == dependency: swap_dependency_type(avro_schema, item, dependency, dependency_type, dependencies) elif field['items'] == dependency: @@ -163,7 +173,6 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend dependencies.extend(dependency_type.get('dependencies', [])) if 'dependencies' in dependency_type: swap_record_dependencies(avro_schema, dependency_type) - del dependency_type['dependencies'] elif 'type' in field['items']: swap_dependency_type(avro_schema, field['items'], dependency, dependency_type, dependencies) elif field['type'] == 'map': @@ -178,7 +187,6 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend dependencies.extend(dependency_type.get('dependencies', [])) if 'dependencies' in dependency_type: swap_record_dependencies(avro_schema, dependency_type) - del dependency_type['dependencies'] elif isinstance(item, dict) and 'type' in item and item.get('type') == dependency: swap_dependency_type(avro_schema, item, dependency, dependency_type, dependencies) if field['values'] == dependency: @@ -189,7 +197,6 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend dependencies.extend(dependency_type.get('dependencies', [])) if 'dependencies' in dependency_type: swap_record_dependencies(avro_schema, dependency_type) - del dependency_type['dependencies'] elif 'type' in field['values']: swap_dependency_type(avro_schema, field['values'], dependency, dependency_type, dependencies) elif field['type'] == 'record': diff --git a/avrotize/jsonstoavro.py b/avrotize/jsonstoavro.py index d95b742..5a7d2fa 100644 --- a/avrotize/jsonstoavro.py +++ b/avrotize/jsonstoavro.py @@ -5,23 +5,16 @@ import jsonpointer import requests import copy +from avrotize.common import avro_name, generic_type from avrotize.dependency_resolver import inline_dependencies_of, sort_messages_by_dependencies from urllib.parse import ParseResult, urljoin, urlparse class JsonToAvroConverter: def __init__(self) -> None: - self.imported_types = {} + self.imported_types: Dict[Any, Any] = {} self.root_namespace = "example.com" - - def avro_name(self, name): - """Convert a name into an Avro name.""" - val = re.sub(r'[^a-zA-Z0-9_]', '_', name) - if re.match(r'^[0-9]', val): - val = '_' + val - return val - def is_empty_type(self, avro_type): """Check if the Avro type is an empty type.""" if len(avro_type) == 0: @@ -80,7 +73,7 @@ def merge_schemas(self, schemas_arg: list, avro_schemas: list, type_name: str | if 'type' in merged_schema and schema['type'] != merged_schema['type']: merged_schema['type'] = [schema['type'],merged_schema['type']] if not type_name: - merged_schema['name'] = self.avro_name(merged_schema.get('name','') + schema.get('name','')) + merged_schema['name'] = avro_name(merged_schema.get('name','') + schema.get('name','')) if 'fields' in schema: if 'fields' in merged_schema: for field in schema['fields']: @@ -98,35 +91,11 @@ def merge_schemas(self, schemas_arg: list, avro_schemas: list, type_name: str | return merged_schema - def generic_type(self) -> list[str | dict]: - simple_type_union: list[str | dict] = ["null", "boolean", "int", "long", "float", "double", "bytes", "string"] - l2 = simple_type_union.copy() - l2.extend([ - { - "type": "array", - "items": simple_type_union - }, - { - "type": "map", - "values": simple_type_union - }]) - l1 = simple_type_union.copy() - l1.extend([ - { - "type": "array", - "items": l2 - }, - { - "type": "map", - "values": l2 - }]) - return l1 - def ensure_type(self, type: dict | str | list) -> dict | str | list: if isinstance(type, str) or isinstance(type, list) or 'type' in type: return type - type['type'] = self.generic_type() + type['type'] = generic_type() return type @@ -167,11 +136,11 @@ def json_schema_primitive_to_avro_type(self, json_primitive: str | list, format: if enum: # replace white space with underscore - enum = [self.avro_name(e) for e in enum if isinstance(e, str) and e != ""] + enum = [avro_name(e) for e in enum if isinstance(e, str) and e != ""] # purge duplicates enum = list(dict.fromkeys(enum)) if len(enum) > 0: - avro_primitive = {"type": "enum", "symbols": enum, "name": self.avro_name(field_name + "_enum")} + avro_primitive = {"type": "enum", "symbols": enum, "name": avro_name(field_name + "_enum")} else: avro_primitive = "string" return avro_primitive @@ -247,7 +216,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ """Convert a JSON type to Avro type.""" avro_type: list | dict | str = {} - local_name = self.avro_name(field_name if field_name else record_name) + local_name = avro_name(field_name if field_name else record_name) qualified_name = namespace + "." + local_name if isinstance(json_type, dict): @@ -311,7 +280,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ if len(json_types) == 1: avro_type = self.json_type_to_avro_type(json_types[0], record_name, field_name, namespace, dependencies, json_schema, base_uri, avro_schema, record_stack) if isinstance(avro_type, dict) and self.is_empty_type(avro_type): - avro_type['type'] = self.generic_type() + avro_type['type'] = generic_type() return avro_type else: subtypes = [] @@ -319,7 +288,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ type_deps = [] for json_type_option in json_types: # we only set the field_name if this is not a type reference - sub_field_name = self.avro_name(local_name + "_" + str(count)) if not isinstance(json_type_option, dict) or not '$ref' in json_type_option else None + sub_field_name = avro_name(local_name + "_" + str(count)) if not isinstance(json_type_option, dict) or not '$ref' in json_type_option else None avro_subtype = self.json_type_to_avro_type(json_type_option, record_name, sub_field_name, namespace, dependencies, json_schema, base_uri, avro_schema, record_stack) if isinstance(avro_subtype, dict) and 'dependencies' in avro_subtype: type_deps.extend(avro_subtype['dependencies']) @@ -341,7 +310,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ avro_type['doc'] = json_type['description'] if 'title' in json_type and isinstance(avro_type, dict): - avro_type['name'] = self.avro_name(json_type['title']) + avro_type['name'] = avro_name(json_type['title']) # first, pull in any referenced definitions and merge with this schema if '$ref' in json_type: @@ -361,7 +330,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ type_namespace = namespace parsed_ref = urlparse(ref) if parsed_ref.fragment: - type_name = self.avro_name(parsed_ref.fragment.split('/')[-1]) + type_name = avro_name(parsed_ref.fragment.split('/')[-1]) sub_namespace = '.'.join(parsed_ref.fragment.split('/')[1:-1]) type_namespace = self.root_namespace + '.' + sub_namespace if len(sub_namespace) > 0 else self.root_namespace @@ -373,7 +342,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ if isinstance(avro_type, list) or (not isinstance(avro_type, dict) or not avro_type.get('type') == "record"): if isinstance(avro_type, dict) and not 'type' in avro_type: print(f"WARNING: no type definition for {ref} in record {record_name}: {json.dumps(avro_type)}") - avro_type = self.generic_type() + avro_type = generic_type() avro_type = { "type": "record", @@ -411,7 +380,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ # if 'const' is present, make this an enum if 'const' in json_type: const = json_type['const'] - avro_type = self.merge_schemas([avro_type, {"type": "enum", "symbols": [const], "name": self.avro_name(local_name)}], avro_schema, local_name) + avro_type = self.merge_schemas([avro_type, {"type": "enum", "symbols": [const], "name": avro_name(local_name)}], avro_schema, local_name) json_object_type = json_type.get('type') if json_object_type: @@ -419,7 +388,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ if 'items' in json_type: avro_type = self.merge_schemas([avro_type, {"type": "array", "items": self.json_type_to_avro_type(json_type['items'], record_name, field_name, namespace, dependencies, json_schema, base_uri, avro_schema, record_stack)}], avro_schema, avro_type.get('name', local_name)) else: - avro_type = self.merge_schemas([avro_type, {"type": "array", "items": self.generic_type()}], avro_schema, avro_type.get('name', local_name)) + avro_type = self.merge_schemas([avro_type, {"type": "array", "items": generic_type()}], avro_schema, avro_type.get('name', local_name)) elif json_object_type == 'object' or 'object' in json_object_type: record_stack.append(record_name) avro_type = self.merge_schemas([avro_type, self.json_schema_object_to_avro_record(local_name, json_type, namespace, json_schema, base_uri, avro_schema, record_stack)], avro_schema, avro_type.get('name', local_name)) @@ -479,7 +448,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp return type title = json_object.get('title') - record_name = self.avro_name(name if name else title if title else None) + record_name = avro_name(name if name else title if title else None) if record_name == None: raise ValueError(f"Cannot determine record name for json_object {json_object}") if len(record_stack) > 0: @@ -495,7 +464,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp if record_name in record_stack: # to break the cycle, we will use a containment type that references # the record that is being defined - ref_name = self.avro_name(record_name + "_ref") + ref_name = avro_name(record_name + "_ref") return { "type": "record", "name": ref_name, @@ -529,11 +498,11 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp if not field_name in required_fields and not 'null' in avro_field_type: if isinstance(avro_field_type, list): avro_field_type.append('null') - avro_field = {"name": self.avro_name(field_name), "type": avro_field_type} + avro_field = {"name": avro_name(field_name), "type": avro_field_type} else: - avro_field = {"name": self.avro_name(field_name), "type": ["null", avro_field_type]} + avro_field = {"name": avro_name(field_name), "type": ["null", avro_field_type]} else: - avro_field = {"name": self.avro_name(field_name), "type": avro_field_type} + avro_field = {"name": avro_name(field_name), "type": avro_field_type} if field.get('description'): avro_field['doc'] = field['description'] @@ -544,7 +513,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp additional_props = json_object['additionalProperties'] values_type = self.json_type_to_avro_type(additional_props, record_name, record_name + "_extensions", namespace, dependencies, json_schema, base_uri, avro_schema, record_stack) if self.is_empty_type(values_type): - values_type = self.generic_type() + values_type = generic_type() avro_record['fields'].append( { "name": record_name, @@ -565,7 +534,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp raise ValueError(f"prop_type for pattern name {pattern_name} and record_name {record_name} is empty") avro_record['fields'].append( { - "name": self.avro_name(pattern_name), + "name": avro_name(pattern_name), "namespace": namespace, "type": { "type": "map", @@ -577,7 +546,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp additional_props = json_object['additionalProperties'] values_type = self.json_type_to_avro_type(additional_props, record_name, record_name + "_extensions", namespace, dependencies, json_schema, base_uri, avro_schema, record_stack) if self.is_empty_type(values_type): - values_type = self.generic_type() + values_type = generic_type() avro_record['fields'].append( { "name": "values", @@ -595,7 +564,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp type = self.ensure_type(self.json_type_to_avro_type(props, record_name, pattern_name, namespace, dependencies, json_schema, base_uri, avro_schema, record_stack)) avro_record['fields'].append( { - "name": self.avro_name(pattern_name), + "name": avro_name(pattern_name), "namespace": namespace, "type": { "type": "map", @@ -609,7 +578,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp "name": "values", "type": { "type": "map", - "values": self.generic_type() + "values": generic_type() } }) elif 'type' in json_object and (json_object['type'] == 'array' or 'array' in json_object['type']) and \ @@ -617,7 +586,7 @@ def json_schema_object_to_avro_record(self, name: str, json_object: dict, namesp if 'items' in json_object: avro_type = self.json_type_to_avro_type(json_object['items'], record_name, 'values', namespace, dependencies, json_schema, base_uri, avro_schema, record_stack) else: - avro_type = self.generic_type() + avro_type = generic_type() avro_record['fields'].append( { "name": "values", @@ -698,7 +667,7 @@ def process_definition(self, json_schema, namespace, base_uri, avro_schema, reco if not isinstance(avro_schema_item_list, list): avro_schema_item_list = [avro_schema_item_list] for avro_schema_item in avro_schema_item_list: - avro_schema_item['name'] = self.avro_name(schema_name) + avro_schema_item['name'] = avro_name(schema_name) existing_type = next((t for t in avro_schema if t.get('name') == avro_schema_item['name'] and t.get('namespace') == avro_schema_item.get('namespace') ), None) if not existing_type: avro_schema.append(avro_schema_item) diff --git a/avrotize/xsdtoavro.py b/avrotize/xsdtoavro.py index 16e1216..080cb94 100644 --- a/avrotize/xsdtoavro.py +++ b/avrotize/xsdtoavro.py @@ -1,144 +1,228 @@ import re +from typing import Dict, List, Tuple import xml.etree.ElementTree as ET import json import re from urllib.parse import urlparse +from avrotize.common import avro_name, generic_type from avrotize.dependency_resolver import inline_dependencies_of, sort_messages_by_dependencies xsd_namespace = 'http://www.w3.org/2001/XMLSchema' -def xsd_targetnamespace_to_avro_namespace(targetnamespace: str) -> str: - """Convert a XSD namespace to Avro Namespace.""" - parsed_url = urlparse(targetnamespace) - path_segments = parsed_url.path.strip('/').split('/') - reversed_path_segments = reversed(path_segments) - namespace_prefix = '.'.join(reversed_path_segments) - namespace_suffix = parsed_url.hostname - namespace = f"{namespace_prefix}.{namespace_suffix}" - return namespace - -def xsd_to_avro_type(xsd_type: str, namespaces: dict): - # split the type on the first colon - prefix, type = xsd_type.split(':', 1) - if not type: - type = prefix - prefix = '' - # find the namespace for the prefix - ns = namespaces.get(xsd_namespace, '') - if ns == prefix: - base_type_map = { - 'string': 'string', - 'int': 'int', - 'integer': 'int', - 'long': 'long', - 'short': 'int', - 'decimal': 'float', - 'float': 'float', - 'double': 'double', - 'boolean': 'boolean', - 'date': {'type': 'int', 'logicalType': 'date'}, - 'dateTime': {'type': 'long', 'logicalType': 'timestamp-millis'}, - } - return base_type_map.get(type, type) - else: - return type - - - -def process_element(element: ET.Element, namespaces: dict, avro_schema: list, avro_namespace: str, dependencies: list): - name = element.get('name') - type = element.get('type','') - minOccurs = element.get('minOccurs') - maxOccurs = element.get('maxOccurs') - - if type.startswith(f'{namespaces[xsd_namespace]}:'): - avro_type = xsd_to_avro_type(type, namespaces) - else: - avro_type = type - dependencies.append(type) - - if maxOccurs is not None and maxOccurs != '1': - avro_type = {'type' : 'array', 'items': avro_type} - if minOccurs is not None and minOccurs == '0': - avro_type = ['null', avro_type] - - return {'name': name, 'type': avro_type} - -def process_complex_type(complex_type: ET.Element, namespaces: dict, avro_schema: list, avro_namespace: str): - dependencies = [] - avro_type = { - 'type': 'record', - 'name': complex_type.attrib.get('name'), - 'namespace': avro_namespace, - 'fields': [] - } - fields = [] - for el in complex_type.findall(f'.//{{{xsd_namespace}}}element', namespaces): - fields.append(process_element(el, namespaces, avro_schema, avro_namespace, dependencies)) - avro_type['fields'] = fields - if dependencies: - avro_type['dependencies'] = dependencies - return avro_type - -def process_top_level_element(element: ET.Element, namespaces: dict, avro_schema: list, avro_namespace: str): - dependencies = [] - avro_type = { - 'type': 'record', - 'name': element.attrib.get('name'), - 'namespace': avro_namespace, - 'fields': [] - } - - fields = [] - for el in element.findall(f'.//{{{xsd_namespace}}}element', namespaces): - fields.append(process_element(el, namespaces, avro_schema, avro_namespace, dependencies)) - avro_type['fields'] = fields - if dependencies: - avro_type['dependencies'] = dependencies - return avro_type - -def extract_xml_namespaces(xml_str: str): - # This regex finds all xmlns:prefix="uri" declarations - pattern = re.compile(r'xmlns:([\w]+)="([^"]+)"') - namespaces = {m.group(2): m.group(1) for m in pattern.finditer(xml_str)} - return namespaces - -def xsd_to_avro(xsd_path: str): - # load the XSD file into a string - with open(xsd_path, 'r') as f: - xsd = f.read() - - namespaces = extract_xml_namespaces(xsd) - root = ET.fromstring(xsd) - targetNamespace = root.get('targetNamespace') - if targetNamespace is None: - raise ValueError('targetNamespace not found') - avro_namespace = xsd_targetnamespace_to_avro_namespace(targetNamespace) - ET.register_namespace(namespaces[xsd_namespace], xsd_namespace) - avro_schema = [] - - for complex_type in root.findall(f'./{{{xsd_namespace}}}complexType', namespaces): - avro_schema.append(process_complex_type(complex_type, namespaces, avro_schema, avro_namespace)) - - top_level_elements = root.findall(f'./{{{xsd_namespace}}}element', namespaces) - if len(top_level_elements) == 1: - record = process_top_level_element(top_level_elements[0], namespaces, avro_schema, avro_namespace) - inline_dependencies_of(avro_schema, record) - return record - for element in top_level_elements: - avro_schema.append(process_top_level_element(element, namespaces, avro_schema, avro_namespace)) - - avro_schema = sort_messages_by_dependencies(avro_schema) - if len(avro_schema) == 1: - return avro_schema[0] - else: - return avro_schema - -def convert_xsd_to_avro(xsd_path: str, avro_path: str): - avro_schema = xsd_to_avro(xsd_path) - with open(avro_path, 'w') as f: - json.dump(avro_schema, f, indent=4) - -# This approach dynamically handles namespaces by extracting the XSD namespace URI from the document -# and then uses it to correctly identify and process elements, types, etc. +class XSDToAvro: + + def __init__(self) -> None: + self.simple_type_map: Dict[str,str] = {} + self.avro_namespace = '' + + def xsd_targetnamespace_to_avro_namespace(self, targetnamespace: str) -> str: + """Convert a XSD namespace to Avro Namespace.""" + parsed_url = urlparse(targetnamespace) + path_segments = parsed_url.path.strip('/').split('/') + reversed_path_segments = reversed(path_segments) + namespace_prefix = '.'.join(reversed_path_segments) + namespace_suffix = parsed_url.hostname + namespace = f"{namespace_prefix}.{namespace_suffix}" + return avro_name(namespace) + + def xsd_to_avro_type(self, xsd_type: str, namespaces: dict): + """Convert a XSD type to an Avro type.""" + if xsd_type in self.simple_type_map: + return self.simple_type_map[xsd_type] + + # split the type on the first colon + if ':' not in xsd_type: + type = xsd_type + prefix = '' + else: + prefix, type = xsd_type.split(':', 1) + if not type: + type = prefix + prefix = '' + # find the namespace for the prefix + ns = namespaces.get(xsd_namespace, '') + if ns == prefix: + base_type_map = { + 'string': 'string', + 'int': 'int', + 'integer': 'int', + 'long': 'long', + 'short': 'int', + 'decimal': 'float', + 'float': 'float', + 'double': 'double', + 'boolean': 'boolean', + 'date': {'type': 'int', 'logicalType': 'date'}, + 'dateTime': {'type': 'long', 'logicalType': 'timestamp-millis'}, + } + return base_type_map.get(type, self.avro_namespace+'.'+type) + else: + return self.avro_namespace+'.'+type + + + + def process_element(self, element: ET.Element, namespaces: dict, avro_schema: list, dependencies: list): + name = element.get('name') + type = element.get('type','') + minOccurs = element.get('minOccurs') + maxOccurs = element.get('maxOccurs') + if type.startswith(f'{namespaces[xsd_namespace]}:'): + avro_type = self.xsd_to_avro_type(type, namespaces) + else: + avro_type = self.xsd_to_avro_type(type, namespaces) + if not type.startswith(f'{namespaces[xsd_namespace]}:') and type not in self.simple_type_map.keys(): + dependencies.append(avro_type if isinstance(avro_type, str) else avro_type.get('namespace')+'.'+avro_type.get('name')) + if maxOccurs is not None and maxOccurs != '1': + avro_type = {'type' : 'array', 'items': avro_type} + if minOccurs is not None and minOccurs == '0': + avro_type = ['null', avro_type] + return {'name': name, 'type': avro_type} + + + def process_complex_type(self, complex_type: ET.Element, namespaces: dict, avro_schema: list) -> dict | str: + dependencies: List[str] = [] + avro_type: dict = { + 'type': 'record', + 'name': complex_type.attrib.get('name'), + 'namespace': self.avro_namespace, + 'fields': [] + } + fields = [] + for sequence in complex_type.findall(f'{{{xsd_namespace}}}sequence', namespaces): + for el in sequence.findall(f'{{{xsd_namespace}}}element', namespaces): + fields.append(self.process_element(el, namespaces, avro_schema, dependencies)) + if sequence.findall(f'{{{xsd_namespace}}}any', namespaces): + fields.append({ "name": "any", "type": generic_type() }) + for all in complex_type.findall(f'{{{xsd_namespace}}}all', namespaces): + for el in all.findall(f'{{{xsd_namespace}}}element', namespaces): + fields.append(self.process_element(el, namespaces, avro_schema, dependencies)) + for choice in complex_type.findall(f'{{{xsd_namespace}}}choice', namespaces): + choices: list = [] + for el in choice.findall(f'{{{xsd_namespace}}}element', namespaces): + deps: List [str] = [] + choice_field = self.process_element(el, namespaces, avro_schema, deps) + choice_record = { + 'type': 'record', + 'name': f'{complex_type.attrib.get("name")}_{choice_field["name"]}', + 'fields': [choice_field], + 'namespace': self.avro_namespace + } + choices.append(choice_record) + dependencies.extend(deps) + choices_field = { + 'name': f'{complex_type.attrib.get("name")}', + 'type': choices + } + fields.append(choices_field) + for attribute in complex_type.findall(f'.{{{xsd_namespace}}}attribute', namespaces): + fields.append(self.process_element(attribute, namespaces, avro_schema, dependencies)) + for el in complex_type.findall(f'{{{xsd_namespace}}}simpleContent', namespaces): + simple_content = el.find(f'{{{xsd_namespace}}}extension', namespaces) + if simple_content is not None: + baseType = simple_content.attrib.get('base') + if baseType: + fields.append({"name": "value", "type": self.xsd_to_avro_type(baseType, namespaces)}) + for se in simple_content.findall(f'{{{xsd_namespace}}}attribute', namespaces): + fields.append(self.process_element(se, namespaces, avro_schema, dependencies)) + else: + raise ValueError("No base found in simpleContent") + + avro_type['fields'] = fields + if dependencies: + avro_type['dependencies'] = dependencies + return avro_type + + def process_simple_type(self, simple_type: ET.Element, namespaces: dict, avro_schema: list) -> Tuple[bool, dict | str]: + type_name = simple_type.attrib.get('name') + if not type_name: + raise ValueError("SimpleType must have a name") + + for restriction in simple_type.findall(f'{{{xsd_namespace}}}restriction', namespaces): + baseType = restriction.get('base') + enums = [el.attrib.get('value') for el in restriction.findall(f'{{{xsd_namespace}}}enumeration', namespaces)] + if enums: + return True, { + 'type': 'enum', + 'name': simple_type.attrib.get('name'), + 'namespace': self.avro_namespace, + 'symbols': enums + } + elif baseType: + self.simple_type_map[type_name] = self.xsd_to_avro_type(baseType, namespaces) + return False, self.simple_type_map[type_name] + raise ValueError("No content found in simple type") + + def process_top_level_element(self, element: ET.Element, namespaces: dict, avro_schema: list): + dependencies: List[str] = [] + avro_type: dict = { + 'type': 'record', + 'name': 'Root', + 'namespace': self.avro_namespace, + 'fields': [] + } + + if 'type' in element.attrib: + avro_type['fields'].append(self.process_element(element, namespaces, avro_schema, dependencies)) + if dependencies: + avro_type['dependencies'] = dependencies + return avro_type + else: + fields = [] + for el in element.findall(f'.//{{{xsd_namespace}}}complexType', namespaces): + fields.append(self.process_element(el, namespaces, avro_schema, dependencies)) + avro_type['fields'] = fields + if dependencies: + avro_type['dependencies'] = dependencies + return avro_type + + def extract_xml_namespaces(self, xml_str: str): + # This regex finds all xmlns:prefix="uri" declarations + pattern = re.compile(r'xmlns:([\w]+)="([^"]+)"') + namespaces = {m.group(2): m.group(1) for m in pattern.finditer(xml_str)} + return namespaces + + def xsd_to_avro(self, xsd_path: str): + # load the XSD file into a string + with open(xsd_path, 'r') as f: + xsd = f.read() + + namespaces = self.extract_xml_namespaces(xsd) + root = ET.fromstring(xsd) + targetNamespace = root.get('targetNamespace') + if targetNamespace is None: + raise ValueError('targetNamespace not found') + self.avro_namespace = self.xsd_targetnamespace_to_avro_namespace(targetNamespace) + ET.register_namespace(namespaces[xsd_namespace], xsd_namespace) + avro_schema: List[dict|list|str] = [] + + for simple_type in root.findall(f'{{{xsd_namespace}}}simpleType', namespaces): + add_to_schema, simple_type_type = self.process_simple_type(simple_type, namespaces, avro_schema) + # we only want to append simple types if they are not resolved to one of the base types + if add_to_schema: + avro_schema.append(simple_type_type) + for complex_type in root.findall(f'{{{xsd_namespace}}}complexType', namespaces): + avro_schema.append(self.process_complex_type(complex_type, namespaces, avro_schema)) + + top_level_elements = root.findall(f'{{{xsd_namespace}}}element', namespaces) + if len(top_level_elements) == 1: + record = self.process_top_level_element(top_level_elements[0], namespaces, avro_schema) + inline_dependencies_of(avro_schema, record) + return record + for element in top_level_elements: + avro_schema.append(self.process_top_level_element(element, namespaces, avro_schema)) + + avro_schema = sort_messages_by_dependencies(avro_schema) + if len(avro_schema) == 1: + return avro_schema[0] + else: + return avro_schema + + def convert_xsd_to_avro(self, xsd_path: str, avro_path: str, namespace: str | None = None): + avro_schema = self.xsd_to_avro(xsd_path) + with open(avro_path, 'w') as f: + json.dump(avro_schema, f, indent=4) + +def convert_xsd_to_avro(xsd_path: str, avro_path: str, namespace: str | None = None): + xsd_to_avro = XSDToAvro() + xsd_to_avro.convert_xsd_to_avro(xsd_path, avro_path, namespace) \ No newline at end of file