Skip to content

Commit

Permalink
JSON dependency resolution
Browse files Browse the repository at this point in the history
Signed-off-by: Clemens Vasters <clemens@vasters.com>
  • Loading branch information
clemensv committed Feb 20, 2024
1 parent 39d6a3b commit 20247d4
Show file tree
Hide file tree
Showing 4 changed files with 2,430 additions and 70 deletions.
51 changes: 36 additions & 15 deletions avrotize/dependency_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,21 @@ def inline_dependencies_of(avro_schema, record):


def sort_messages_by_dependencies(avro_schema):

"""
Sort the messages in avro_schema by their dependencies. Avro Schema requires
that type definitions must be defined before they are used. This method
ensures this. Types that have dependencies will be moved at the end of the list.
If necessary, it will also resolve circular dependencies by inlining the
dependent record.
The method expects all types with dependencies to have a 'dependencies' key in their
dict that contains a list of types that they depend on.
Args:
avro_schema: List of Avro schema records.
"""

# if all are just strings, then it is already sorted
if all(isinstance(record, str) for record in avro_schema):
return avro_schema

Expand All @@ -27,12 +41,8 @@ def sort_messages_by_dependencies(avro_schema):
sorted_messages.append(record)
avro_schema.remove(record)
continue
for field in record.get('fields', []):
if isinstance(field['type'], dict):
field['type'] = sort_messages_by_dependencies([field['type']])[0]
elif isinstance(field['type'], list):
field['type'] = sort_messages_by_dependencies(field['type'])

# if this record is not a dependency of any other record, it can be safely emitted now
if not any(record.get('name') in other_record.get('dependencies', [])
or (record.get('namespace','')+'.'+record.get('name')) in other_record.get('dependencies', []) for other_record in [x for x in avro_schema if isinstance(x, dict) and 'name' in x]):
if 'dependencies' in record:
Expand All @@ -41,11 +51,15 @@ def sort_messages_by_dependencies(avro_schema):
avro_schema.remove(record)
found = True

# If there are no records without dependencies, so we just take one record and move on
# If there are no records without dependencies, we will grab the first
# record with dependencies and start resolving circular dependencies
if len(avro_schema) > 0 and not found:
record = avro_schema[0]
if 'dependencies' in record:
record = next((x for x in avro_schema if isinstance(x, dict) and 'dependencies' in x), None)
if record:
avro_schema_len = len(avro_schema)
swap_record_dependencies(avro_schema, record)
if len(avro_schema) == avro_schema_len:
inline_dependencies_of(avro_schema, record)

sorted_messages.reverse()
return sorted_messages
Expand All @@ -61,6 +75,11 @@ def swap_record_dependencies(avro_schema, record):
swap_dependency_type(avro_schema, field, dependency, dependency_type, deps)
record['dependencies'] = [dep for dep in deps if dep != record['name'] and record.get('namespace','')+'.'+record['name'] != dep]

def strip_namespace(name):
if isinstance(name, str):
return name.split('.')[-1]
return name

def swap_dependency_type(avro_schema, field, dependency, dependency_type, dependencies):
""" to break circular dependencies, we will inline the dependent record and remove the dependency """
if not dependency in dependencies:
Expand All @@ -69,7 +88,7 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend
return

# Replace the dependency type with the dependency_type in avro_schema.
if field['type'] == dependency:
if strip_namespace(field['type']) == dependency:
field['type'] = dependency_type
if dependency_type in avro_schema:
avro_schema.remove(dependency_type)
Expand All @@ -81,8 +100,8 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend
# type is a Union?
elif type(field['type']) is list:
for field_type in field['type']:
if field_type == dependency:
field['type'].remove(dependency)
if strip_namespace(field_type) == dependency:
field['type'].remove(field_type)
field['type'].append(dependency_type)
if dependency_type in avro_schema:
avro_schema.remove(dependency_type)
Expand All @@ -92,12 +111,14 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend
swap_record_dependencies(avro_schema, dependency_type)
del dependency_type['dependencies']
# type is an object?
elif type(field_type) is dict and field_type.get('type') != None and field_type.get('name') == dependency:
elif isinstance(field_type, dict) and 'type' in field_type and field_type.get('type') == dependency or \
'items' in field_type and field_type.get('items') == dependency or \
'values' in field_type and field_type.get('values') == dependency:
swap_dependency_type(avro_schema, field_type, dependency, dependency_type, dependencies)
elif 'type' in field['type']:
swap_dependency_type(avro_schema, field['type'], dependency, dependency_type, dependencies)
elif field['type'] == 'array':
if field['items'] == dependency:
if strip_namespace(field['items']) == dependency:
field['items'] = dependency_type
if dependency_type in avro_schema:
avro_schema.remove(dependency_type)
Expand All @@ -109,7 +130,7 @@ def swap_dependency_type(avro_schema, field, dependency, dependency_type, depend
elif 'type' in field['items']:
swap_dependency_type(avro_schema, field['items'], dependency, dependency_type, dependencies)
elif field['type'] == 'map':
if field['values'] == dependency:
if strip_namespace(field['values']) == dependency:
field['values'] = dependency_type
if dependency_type in avro_schema:
avro_schema.remove(dependency_type)
Expand Down
Loading

0 comments on commit 20247d4

Please sign in to comment.