Skip to content

Commit

Permalink
XSD and better dependency resolution
Browse files Browse the repository at this point in the history
Signed-off-by: Clemens Vasters <clemens@vasters.com>
  • Loading branch information
clemensv committed Feb 22, 2024
1 parent d515391 commit c797612
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 210 deletions.
32 changes: 32 additions & 0 deletions avrotize/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import re

def avro_name(name):
"""Convert a name into an Avro name."""
val = re.sub(r'[^a-zA-Z0-9_]', '_', name)
if re.match(r'^[0-9]', val):
val = '_' + val
return val

def generic_type() -> list[str | dict]:
simple_type_union: list[str | dict] = ["null", "boolean", "int", "long", "float", "double", "bytes", "string"]
l2 = simple_type_union.copy()
l2.extend([
{
"type": "array",
"items": simple_type_union
},
{
"type": "map",
"values": simple_type_union
}])
l1 = simple_type_union.copy()
l1.extend([
{
"type": "array",
"items": l2
},
{
"type": "map",
"values": l2
}])
return l1
45 changes: 26 additions & 19 deletions avrotize/dependency_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ def sort_messages_by_dependencies(avro_schema):
remaining_deps = [dep for dep in record['dependencies'] if not dep in [x.get('namespace')+'.'+x.get('name') for x in sorted_messages]]
if len(remaining_deps) > 0:
swap_record_dependencies(avro_schema, record)
remaining_remaining_deps = [dep for dep in record['dependencies'] if not dep in [x.get('namespace')+'.'+x.get('name') for x in sorted_messages]]
found = len(remaining_deps) != len(remaining_remaining_deps)
if found:
break
if isinstance(record, dict) and not 'dependencies' in record:
found = True
else:
remaining_remaining_deps = [dep for dep in record['dependencies'] if not dep in [x.get('namespace')+'.'+x.get('name') for x in sorted_messages]]
found = len(remaining_deps) != len(remaining_remaining_deps)
if found:
break

if not found:
found = False
Expand All @@ -92,15 +95,18 @@ def sort_messages_by_dependencies(avro_schema):
return sorted_messages

def swap_record_dependencies(avro_schema, record):
for dependency in record.get('dependencies', []):
dependency_type = next((x for x in avro_schema if x['name'] == dependency or x.get('namespace','')+'.'+x['name'] == dependency), None)
if not dependency_type:
continue
deps = record.get('dependencies', [])
for field in record['fields']:
if record['name'] != dependency and (record.get('namespace','')+'.'+record['name']) != dependency:
swap_dependency_type(avro_schema, field, dependency, dependency_type, deps)
record['dependencies'] = [dep for dep in deps if dep != record['name'] and record.get('namespace','')+'.'+record['name'] != dep]
while 'dependencies' in record and len(record['dependencies']) > 0:
for dependency in record.get('dependencies', []):
dependency_type = next((x for x in avro_schema if x['name'] == dependency or x.get('namespace','')+'.'+x['name'] == dependency), None)
if not dependency_type and dependency in record['dependencies']:
record['dependencies'].remove(dependency)
deps = record.get('dependencies', [])
for field in record['fields']:
if record['name'] != dependency and (record.get('namespace','')+'.'+record['name']) != dependency:
swap_dependency_type(avro_schema, field, dependency, dependency_type, deps)
record['dependencies'] = [dep for dep in deps if dep != record['name'] and record.get('namespace','')+'.'+record['name'] != dep]
if len(record['dependencies']) == 0:
del record['dependencies']


def swap_dependency_type(avro_schema, field, dependency, dependency_type, dependencies):
Expand All @@ -119,7 +125,7 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend
dependencies.extend(dependency_type.get('dependencies', []))
if 'dependencies' in dependency_type:
swap_record_dependencies(avro_schema, dependency_type)
del dependency_type['dependencies']

# type is a Union?
elif isinstance(field['type'], list):
for field_type in field['type']:
Expand All @@ -132,12 +138,17 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend
dependencies.extend(dependency_type.get('dependencies', []))
if 'dependencies' in dependency_type:
swap_record_dependencies(avro_schema, dependency_type)
del dependency_type['dependencies']
# type is an object?
elif isinstance(field_type, dict) and 'type' in field_type and field_type.get('type') == dependency or \
'items' in field_type and field_type.get('items') == dependency or \
'values' in field_type and field_type.get('values') == dependency:
swap_dependency_type(avro_schema, field_type, dependency, dependency_type, dependencies)
elif isinstance(field_type, list):
for item in field_type:
if item == dependency or isinstance(item, dict) and item.get('type') == dependency:
swap_dependency_type(avro_schema, field_type, dependency, dependency_type, dependencies)
elif isinstance(field_type, dict) and 'type' in field_type:
swap_dependency_type(avro_schema, field_type, dependency, dependency_type, dependencies)
elif isinstance(field['type'], dict) and 'type' in field['type']:
swap_dependency_type(avro_schema, field['type'], dependency, dependency_type, dependencies)
elif field['type'] == 'array':
Expand All @@ -152,7 +163,6 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend
dependencies.extend(dependency_type.get('dependencies', []))
if 'dependencies' in dependency_type:
swap_record_dependencies(avro_schema, dependency_type)
del dependency_type['dependencies']
elif isinstance(item, dict) and 'type' in item and item.get('type') == dependency:
swap_dependency_type(avro_schema, item, dependency, dependency_type, dependencies)
elif field['items'] == dependency:
Expand All @@ -163,7 +173,6 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend
dependencies.extend(dependency_type.get('dependencies', []))
if 'dependencies' in dependency_type:
swap_record_dependencies(avro_schema, dependency_type)
del dependency_type['dependencies']
elif 'type' in field['items']:
swap_dependency_type(avro_schema, field['items'], dependency, dependency_type, dependencies)
elif field['type'] == 'map':
Expand All @@ -178,7 +187,6 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend
dependencies.extend(dependency_type.get('dependencies', []))
if 'dependencies' in dependency_type:
swap_record_dependencies(avro_schema, dependency_type)
del dependency_type['dependencies']
elif isinstance(item, dict) and 'type' in item and item.get('type') == dependency:
swap_dependency_type(avro_schema, item, dependency, dependency_type, dependencies)
if field['values'] == dependency:
Expand All @@ -189,7 +197,6 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend
dependencies.extend(dependency_type.get('dependencies', []))
if 'dependencies' in dependency_type:
swap_record_dependencies(avro_schema, dependency_type)
del dependency_type['dependencies']
elif 'type' in field['values']:
swap_dependency_type(avro_schema, field['values'], dependency, dependency_type, dependencies)
elif field['type'] == 'record':
Expand Down
Loading

0 comments on commit c797612

Please sign in to comment.