From 3761b69d12c77a17a3f753f794776b4b7e111090 Mon Sep 17 00:00:00 2001 From: Clemens Vasters Date: Thu, 22 Feb 2024 12:34:27 +0100 Subject: [PATCH] Proto reworked Signed-off-by: Clemens Vasters --- avrotize/avrotize.py | 6 +- avrotize/avrotoproto.py | 543 ++++++++++++++++++++++++---------------- avrotize/jsonstoavro.py | 39 +-- 3 files changed, 355 insertions(+), 233 deletions(-) diff --git a/avrotize/avrotize.py b/avrotize/avrotize.py index f94be60..bd66fb3 100644 --- a/avrotize/avrotize.py +++ b/avrotize/avrotize.py @@ -21,6 +21,8 @@ def main(): p2a_parser = subparsers.add_parser('a2p', help='Convert Proto schema to Avro schema') p2a_parser.add_argument('--proto', type=str, help='Path to the Proto file', required=True) p2a_parser.add_argument('--avsc', type=str, help='Path to the Avro schema file', required=True) + p2a_parser.add_argument('--naming', type=str, help='Type naming convention', choices=['snake', 'camel', 'pascal'], required=False, default='pascal') + p2a_parser.add_argument('--allow-optional', action='store_true', help='Enable support for "optional" fields', default=False) j2a_parser = subparsers.add_parser('j2a', help='Convert JSON schema to Avro schema') j2a_parser.add_argument('--jsons', type=str, help='Path to the JSON schema file', required=True) @@ -66,7 +68,9 @@ def main(): elif args.command == 'a2p': proto_schema_path = args.proto avro_file_path = args.avsc - convert_avro_to_proto(avro_file_path, proto_schema_path) + naming = args.naming + allow_optional = args.allow_optional + convert_avro_to_proto(avro_file_path, proto_schema_path, naming_mode=naming, allow_optional=allow_optional) elif args.command == 'j2a': json_schema_file_path = args.jsons avro_schema_path = args.avsc diff --git a/avrotize/avrotoproto.py b/avrotize/avrotoproto.py index 12f275c..c21adf7 100644 --- a/avrotize/avrotoproto.py +++ b/avrotize/avrotoproto.py @@ -2,235 +2,352 @@ import json import argparse import os -import typing +from typing import Literal, NamedTuple, Dict, Any, List indent = ' ' -Comment = typing.NamedTuple('Comment', [('content', str), ('tags', typing.Dict[str, typing.Any])]) -Oneof = typing.NamedTuple('Oneof', [('comment', 'Comment'), ('name', str), ('fields', typing.List['Field'])]) -Field = typing.NamedTuple('Field', [('comment', 'Comment'), ('label', str), ('type', str), ('key_type', str), ('val_type', str), ('name', str), ('number', int)]) -Enum = typing.NamedTuple('Enum', [('comment', 'Comment'), ('name', str), ('fields', typing.Dict[str, 'Field'])]) -Message = typing.NamedTuple('Message', [('comment', 'Comment'), ('name', str), ('fields', typing.List['Field']), ('oneofs', typing.List['Oneof']), - ('messages', typing.Dict[str, 'Message']), ('enums', typing.Dict[str, 'Enum'])]) -Service = typing.NamedTuple('Service', [('name', str), ('functions', typing.Dict[str, 'RpcFunc'])]) -RpcFunc = typing.NamedTuple('RpcFunc', [('name', str), ('in_type', str), ('out_type', str), ('uri', str)]) -ProtoFile = typing.NamedTuple('ProtoFile', - [('messages', typing.Dict[str, 'Message']), ('enums', typing.Dict[str, 'Enum']), - ('services', typing.Dict[str, 'Service']), ('imports', typing.List[str]), - ('options', typing.Dict[str, str]), ('package', str)]) -ProtoFiles = typing.NamedTuple('ProtoFiles', [('files', typing.List['ProtoFile'])]) +Comment = NamedTuple('Comment', [('content', str), ('tags', Dict[str, Any])]) +Oneof = NamedTuple('Oneof', [('comment', 'Comment'), ('name', str), ('fields', List['Field'])]) +Field = NamedTuple('Field', [('comment', 'Comment'), ('label', str), ('type', str), ('key_type', str), ('val_type', str), ('name', str), ('number', int), ('dependencies', List[str])]) +Enum = NamedTuple('Enum', [('comment', 'Comment'), ('name', str), ('fields', Dict[str, 'Field'])]) +Message = NamedTuple('Message', [('comment', 'Comment'), ('name', str), ('fields', List['Field']), ('oneofs', List['Oneof']), + ('messages', Dict[str, 'Message']), ('enums', Dict[str, 'Enum']), ('dependencies', List[str])]) +Service = NamedTuple('Service', [('name', str), ('functions', Dict[str, 'RpcFunc'])]) +RpcFunc = NamedTuple('RpcFunc', [('name', str), ('in_type', str), ('out_type', str), ('uri', str)]) +ProtoFile = NamedTuple('ProtoFile', + [('messages', Dict[str, 'Message']), ('enums', Dict[str, 'Enum']), + ('services', Dict[str, 'Service']), ('imports', List[str]), + ('options', Dict[str, str]), ('package', str)]) +ProtoFiles = NamedTuple('ProtoFiles', [('files', List['ProtoFile'])]) +class AvroToProto: + + def __init__(self) -> None: + self.naming_mode: Literal['snake', 'pascal', 'camel'] = 'pascal' + self.allow_optional: bool = False -def avro_primitive_to_proto_type(avro_type): - """Map Avro primitive types to Protobuf types.""" - mapping = { - 'null': 'google.protobuf.Empty', # Special handling may be required - 'boolean': 'bool', - 'int': 'int32', - 'long': 'int64', - 'float': 'float', - 'double': 'double', - 'bytes': 'bytes', - 'string': 'string', - } - # logical types require special handling - if isinstance(avro_type, dict) and 'logicalType' in avro_type: - logical_type = avro_type['logicalType'] - if logical_type == 'date': - return 'string' - elif logical_type == 'time-millis': - return 'string' - elif logical_type == 'timestamp-millis': - return 'string' - elif logical_type == 'decimal': - precision = avro_type['precision'] - scale = avro_type['scale'] - return 'string' - elif logical_type == 'duration': - return 'string' - elif logical_type == 'uuid': - return 'string' - - return mapping.get(avro_type, avro_type) # Default to string for unmapped types + def avro_primitive_to_proto_type(self, avro_type: str, dependencies: List[str]) -> str: + """Map Avro primitive types to Protobuf types.""" + mapping = { + 'null': 'google.protobuf.Empty', # Special handling may be required + 'boolean': 'bool', + 'int': 'int32', + 'long': 'int64', + 'float': 'float', + 'double': 'double', + 'bytes': 'bytes', + 'string': 'string', + } + # logical types require special handling + if isinstance(avro_type, dict) and 'logicalType' in avro_type: + logical_type = avro_type['logicalType'] + if logical_type == 'date': + return 'string' + elif logical_type == 'time-millis': + return 'string' + elif logical_type == 'timestamp-millis': + return 'string' + elif logical_type == 'decimal': + precision = avro_type['precision'] + scale = avro_type['scale'] + return 'string' + elif logical_type == 'duration': + return 'string' + elif logical_type == 'uuid': + return 'string' + + type = mapping.get(avro_type, '') + if not type: + dependencies.append(avro_type) + type = avro_type + return type -def convert_field(message: Message, avro_field: dict, index: int, proto_files: ProtoFiles) -> Field | Oneof | Enum: - """Convert an Avro field to a Protobuf field.""" - field_type = avro_field['type'] - field_name = avro_field['name'] if 'name' in avro_field else field_type.split('.')[-1]+'Value' if isinstance(field_type, str) else f"{index}Value" - if 'doc' in avro_field: - comment = Comment(avro_field["doc"], {}) - else: - comment = Comment('',{}) - - return convert_field_type(message, field_name, field_type, comment, index, proto_files) - -def convert_record_type(avro_record: dict, comment: Comment, proto_files: ProtoFiles) -> Message: - local_message = Message(comment, avro_record['name'], [], [], {}, {}) - for i, f in enumerate(avro_record['fields']): - field = convert_field(local_message, f, i+1, proto_files) - if isinstance(field, Oneof): - local_message.oneofs.append(field) - elif isinstance(field, Enum): - enum = Enum(field.comment, field.name+"_enum", field.fields) - local_message.enums[enum.name] = enum - local_message.fields.append(Field(field.comment, '', enum.name, '', '', field.name.split('.')[-1], i+1)) - elif isinstance(field, Message): - inner_message = Message(field.comment, field.name+"_type", field.fields, field.oneofs, field.messages, field.enums) - local_message.messages[inner_message.name] = inner_message - local_message.fields.append(Field(field.comment, '', inner_message.name, '', '', field.name.split('.')[-1], i+1)) + def compose_name(self, prefix: str, name: str, naming_mode: Literal['pascal', 'camel', 'snake', 'default', 'field'] = 'default') -> str: + if naming_mode == 'default': + naming_mode = self.naming_mode + if naming_mode == 'field': + if self.naming_mode == 'pascal': + naming_mode = 'camel' + else: + naming_mode = self.naming_mode + if naming_mode == 'snake': + return f"{prefix}_{name}" + if naming_mode == 'pascal': + return f"{prefix[0].upper()+prefix[1:] if prefix else ''}{name[0].upper()+name[1:] if name else ''}" + if naming_mode == 'camel': + return f"{prefix[0].lower()+prefix[1:] if prefix else ''}{name[0].upper()+name[1:] if name else ''}" + return prefix+name + + def convert_field(self, message: Message, avro_field: dict, index: int, proto_files: ProtoFiles) -> Field | Oneof | Enum | Message: + """Convert an Avro field to a Protobuf field.""" + field_type = avro_field['type'] + field_name = avro_field['name'] if 'name' in avro_field else self.compose_name(field_type.split('.')[-1],'value', 'field') if isinstance(field_type, str) else self.compose_name(f"_{index}", 'value', 'field') + if 'doc' in avro_field: + comment = Comment(avro_field["doc"], {}) else: - local_message.fields.append(field) - return local_message + comment = Comment('',{}) + + return self.convert_field_type(message, field_name, field_type, comment, index, proto_files) + + def convert_record_type(self, avro_record: dict, comment: Comment, proto_files: ProtoFiles) -> Message: + """Convert an Avro record to a Protobuf message.""" + local_message = Message(comment, avro_record['name'], [], [], {}, {}, []) + offs = 1 + for i, f in enumerate(avro_record['fields']): + field = self.convert_field(local_message, f, i+offs, proto_files) + if isinstance(field, Oneof): + for f in field.fields: + local_message.dependencies.extend(f.dependencies) + local_message.oneofs.append(field) + offs += len(field.fields)-1 + elif isinstance(field, Enum): + enum = Enum(field.comment, self.compose_name(field.name,'enum'), field.fields) + local_message.enums[enum.name] = enum + local_message.fields.append(Field(field.comment, '', enum.name, '', '', field.name.split('.')[-1], i+offs, [])) + elif isinstance(field, Message): + inner_message = Message(field.comment, self.compose_name(field.name,'type'), field.fields, field.oneofs, field.messages, field.enums, []) + local_message.messages[inner_message.name] = inner_message + local_message.fields.append(Field(field.comment, '', inner_message.name, '', '', field.name.split('.')[-1], i+offs, [])) + local_message.dependencies.extend(field.dependencies) + else: + local_message.dependencies.extend(field.dependencies) + local_message.fields.append(field) + return local_message -def convert_field_type(message: Message, field_name: str, field_type: str | dict | list, comment: Comment, index: int, proto_files: ProtoFiles) -> Field | Oneof | Enum: - """Convert an Avro field type to a Protobuf field type.""" - label = '' - - if isinstance(field_type, list): - # Handling union types (including nullable fields) - non_null_types = [t for t in field_type if t != 'null'] - if len(non_null_types) == 1: - label = 'optional' - field_type = non_null_types[0] - elif len(non_null_types) > 0: - oneof_fields = [] - for i, t in enumerate(non_null_types): - field = convert_field_type(message, field_name, t, comment, i+1, proto_files) - if isinstance(field, Field): - field = Field(field.comment, field.label, field.type, field.key_type, field.val_type, field.type.split('.')[-1], i+1) - oneof_fields.append(field) - elif isinstance(field, Oneof): - local_message = Message(comment, field.name, [], [], {}, {}) - local_message.oneofs.append(field) + def convert_field_type(self, message: Message, field_name: str, field_type: str | dict | list, comment: Comment, index: int, proto_files: ProtoFiles) -> Field | Oneof | Enum | Message: + """Convert an Avro field type to a Protobuf field type.""" + label = '' + + if isinstance(field_type, list): + # Handling union types (including nullable fields) + non_null_types = [t for t in field_type if t != 'null'] + if len(non_null_types) == 1: + if self.allow_optional: + label = 'optional' + field_type = non_null_types[0] + elif len(non_null_types) > 0: + oneof_fields = [] + for i, t in enumerate(non_null_types): + field = self.convert_field_type(message, self.compose_name(field_name,'choice', 'field'), t, comment, i+index, proto_files) + if isinstance(field, Field): + if field.type == 'map' or field.type == 'array': + local_message = Message(comment, self.compose_name(field.name,field.type), [], [], {}, {}, field.dependencies) + local_message.fields.append(field) + new_field = Field(field.comment, '', local_message.name, '', '', self.compose_name(field.name.split('.')[-1],field.type, 'field'), i+index, field.dependencies) + message.messages[local_message.name] = local_message + oneof_fields.append(new_field) + else: + field = Field(field.comment, field.label, field.type, field.key_type, field.val_type, self.compose_name(field_name, (field.type.split('.')[-1]), 'field'), i+index, field.dependencies) + oneof_fields.append(field) + elif isinstance(field, Oneof): + deps: List[str] = [] + oneof = field + for f in oneof.fields: + deps.extend(f.dependencies) + local_message = Message(comment, self.compose_name(field.name,'choice'), [], [], {}, {}, deps) + index += len(field.fields) + local_message.oneofs.append(field) + new_field = Field(field.comment, '', local_message.name, '', '', field.name.split('.')[-1], i+index, deps) + message.messages[local_message.name] = local_message + oneof_fields.append(new_field) + elif isinstance(field, Enum): + enum = Enum(field.comment, self.compose_name(field.name,"options"), field.fields) + message.enums[enum.name] = enum + field = Field(field.comment, '', enum.name, '', '', field.name.split('.')[-1], i+index, []) + oneof_fields.append(field) + elif isinstance(field, Message): + local_message = Message(field.comment, self.compose_name(field.name,'type'), field.fields, field.oneofs, field.messages, field.enums, field.dependencies) + message.messages[local_message.name] = local_message + field = Field(field.comment, '', local_message.name, '', '', field.name.split('.')[-1], i+index, field.dependencies) + oneof_fields.append(field) + oneof = Oneof(comment, field_name, copy.deepcopy(oneof_fields)) + return oneof + else: + raise ValueError(f"Field {field_name} is a union type without any non-null types") + + if isinstance(field_type, dict): + # Nested types (e.g., records, enums) require special handling + if field_type['type'] == 'record': + return self.convert_record_type(field_type, comment, proto_files) + elif field_type['type'] == 'enum': + enum_symbols = {symbol: Field(comment, '', symbol, '', '', symbol, s, []) for s, symbol in enumerate(field_type['symbols'])} + return Enum(comment, field_type['name'], enum_symbols) + elif field_type['type'] == 'array': + converted_field_type = self.convert_field_type(message, self.compose_name(field_name, "item"), field_type['items'], comment, index, proto_files) + if isinstance(converted_field_type, Field): + return Field(comment, 'repeated', 'array', '', converted_field_type.type, field_name, index, converted_field_type.dependencies) + elif isinstance(converted_field_type, Enum): + enum = Enum(converted_field_type.comment, self.compose_name(converted_field_type.name,'enum'), converted_field_type.fields) + message.enums[enum.name] = enum + return Field(comment, 'repeated', 'array', '', enum.name, field_name, index, []) + elif isinstance(converted_field_type, Message): + local_message = Message(converted_field_type.comment, self.compose_name(converted_field_type.name,'type'), converted_field_type.fields, converted_field_type.oneofs, converted_field_type.messages, converted_field_type.enums, converted_field_type.dependencies) + message.messages[local_message.name] = local_message + return Field(comment, 'repeated', 'array', '', local_message.name, field_name, index, []) + elif isinstance(converted_field_type, Oneof): + deps3: List[str] = [] + fl = [] + for i, f in enumerate(converted_field_type.fields): + fl.append(Field(Comment('',{}), '', f.type, '', '', f.name, i+1, [])) + deps3.extend(f.dependencies) + oneof = Oneof(converted_field_type.comment, 'item', fl) + local_message = Message(comment, self.compose_name(field_name,'type'), [], [], {}, {}, deps3) + local_message.oneofs.append(oneof) + new_field = Field(Comment('',{}), 'repeated', 'array', '', local_message.name, field_name.split('.')[-1], index, local_message.dependencies) message.messages[local_message.name] = local_message - field = Field(field.comment, '', local_message.name, '', '', field.name.split('.')[-1], i+1) - oneof_fields.append(field) - elif isinstance(field, Enum): - enum = Enum(field.comment, field.name+"_enum", field.fields) + return new_field + elif field_type['type'] == 'map': + converted_field_type = self.convert_field_type(message, self.compose_name(field_name,'item', 'field'), field_type['values'], comment, index, proto_files) + if isinstance(converted_field_type, Field): + return Field(comment, label, 'map', 'string', converted_field_type.type, field_name, index, converted_field_type.dependencies) + elif isinstance(converted_field_type, Enum): + enum = Enum(converted_field_type.comment, self.compose_name(converted_field_type.name,'enum'), converted_field_type.fields) message.enums[enum.name] = enum - field = Field(field.comment, '', enum.name, '', '', field.name.split('.')[-1], i+1) - oneof_fields.append(field) - elif isinstance(field, Message): - local_message = Message(field.comment, field.name+"_type", field.fields, field.oneofs, field.messages, field.enums) + return Field(comment, label, 'map', 'string', enum.name, field_name, index, []) + elif isinstance(converted_field_type, Message): + local_message = Message(converted_field_type.comment, self.compose_name(converted_field_type.name,'type'), converted_field_type.fields, converted_field_type.oneofs, converted_field_type.messages, converted_field_type.enums, []) message.messages[local_message.name] = local_message - field = Field(field.comment, '', local_message.name, '', '', field.name.split('.')[-1], i+1) - oneof_fields.append(field) - oneof = Oneof(comment, field_name, copy.deepcopy(oneof_fields)) - return oneof - else: - raise ValueError(f"Field {field_name} is a union type without any non-null types") + return Field(comment, label, 'map', 'string', local_message.name, field_name, index, local_message.dependencies) + elif isinstance(converted_field_type, Oneof): + deps4: List[str] = [] + fl = [] + for i, f in enumerate(converted_field_type.fields): + fl.append(Field(Comment('',{}), '', f.type, '', '', f.name, i+1, [])) + deps4.extend(f.dependencies) + oneof = Oneof(converted_field_type.comment, 'item', fl) + local_message = Message(comment, self.compose_name(field_name, 'type'), [], [], {}, {}, deps4) + local_message.oneofs.append(oneof) + new_field = Field(Comment('',{}), label, 'map', 'string', local_message.name, field_name.split('.')[-1], index, local_message.dependencies) + message.messages[local_message.name] = local_message + return new_field + elif field_type['type'] == "fixed": + return Field(comment, label, 'fixed','string', 'string', field_name, index, []) + else: + deps1: List[str] = [] + proto_type = self.avro_primitive_to_proto_type(field_type['type'], deps1) + return Field(comment, label, proto_type, '', '', field_name, index, deps1) + elif isinstance(field_type, str): + deps2: List[str] = [] + proto_type = self.avro_primitive_to_proto_type(field_type, deps2) + return Field(comment, label, proto_type, '', '', field_name, index, deps2) + raise ValueError(f"Unknown field type {field_type}") - if isinstance(field_type, dict): - # Nested types (e.g., records, enums) require special handling - if field_type['type'] == 'record': - return convert_record_type(field_type, comment, proto_files) - elif field_type['type'] == 'enum': - return Enum(comment, field_type['name'], {symbol: i for i, symbol in enumerate(field_type['symbols'])}) - elif field_type['type'] == 'array': - return convert_field_type(message, field_name, field_type['items'], comment, 1, proto_files) - elif field_type['type'] == 'map': - return convert_field_type(message, field_name, field_type['values'], 1, comment, proto_files) - elif field_type['type'] == "fixed": - return Field(comment, label, 'fixed','string', 'string', field_name, index) + def avro_schema_to_proto_message(self, avro_schema: dict, proto_files: ProtoFiles) -> str: + comment = Comment('',{}) + if 'doc' in avro_schema: + comment = Comment(avro_schema["doc"], {}) + if avro_schema['type'] == 'record': + message = self.convert_record_type(avro_schema, comment, proto_files) + file = next((f for f in proto_files.files if f.package == avro_schema["namespace"]), None) + if not file: + file = ProtoFile({}, {}, {}, [], {}, avro_schema["namespace"]) + proto_files.files.append(file) + file.messages[message.name] = message + elif avro_schema['type'] == 'enum': + enum_name = avro_schema['name'] + enum_symbols = {symbol: Field(comment, '', symbol, '', '', symbol, s, []) for s, symbol in enumerate(avro_schema['symbols'])} + enum = Enum(comment, enum_name, enum_symbols) + file = next((f for f in proto_files.files if f.package == avro_schema["namespace"]), None) + if not file: + file = ProtoFile({}, {}, {}, [], {}, avro_schema["namespace"]) + proto_files.files.append(file) + file.enums[enum_name] = enum + return avro_schema["name"] + + def avro_schema_to_proto_messages(self, avro_schema_input, proto_files: ProtoFiles): + """Convert an Avro schema to Protobuf message definitions.""" + if not isinstance(avro_schema_input, list): + avro_schema_list = [avro_schema_input] else: - proto_type = avro_primitive_to_proto_type(field_type['type']) - return Field(comment, label, proto_type, '', '', field_name, index) - else: - proto_type = avro_primitive_to_proto_type(field_type) - return Field(comment, label, proto_type, '', '', field_name, index) + avro_schema_list = avro_schema_input + for avro_schema in avro_schema_list: + self.avro_schema_to_proto_message(avro_schema, proto_files) -def avro_schema_to_proto_message(avro_schema: dict, proto_files: ProtoFiles) -> str: - comment = Comment('',{}) - if 'doc' in avro_schema: - comment = Comment(avro_schema["doc"], {}) - if avro_schema['type'] == 'record': - message = convert_record_type(avro_schema, comment, proto_files) - file = next((f for f in proto_files.files if f.package == avro_schema["namespace"]), None) - if not file: - file = ProtoFile({}, {}, {}, [], {}, avro_schema["namespace"]) - proto_files.files.append(file) - file.messages[message.name] = message - elif avro_schema['type'] == 'enum': - enum_name = avro_schema['name'] - enum = Enum(comment, enum_name, {symbol: i for i, symbol in enumerate(avro_schema['symbols'])}) - file = next((f for f in proto_files.files if f.package == avro_schema["namespace"]), None) - if not file: - file = ProtoFile({}, {}, {}, [], {}, avro_schema["namespace"]) - proto_files.files.append(file) - file.enums[enum_name] = enum - return avro_schema["name"] - -def avro_schema_to_proto_messages(avro_schema_input, proto_files: ProtoFiles): - """Convert an Avro schema to Protobuf message definitions.""" - if not isinstance(avro_schema_input, list): - avro_schema_list = [avro_schema_input] - else: - avro_schema_list = avro_schema_input - for avro_schema in avro_schema_list: - avro_schema_to_proto_message(avro_schema, proto_files) + def save_proto_to_file(self, proto_files: ProtoFiles, proto_path): + """Save the Protobuf schema to a file.""" + for proto in proto_files.files: + # gather dependencies that are within the package + deps: List[str] = [] + for message in proto.messages.values(): + for dep in message.dependencies: + if '.' in dep: + deps.append(dep.rsplit('.',1)[0]) + deps = list(set(deps)) -def save_proto_to_file(proto_files: ProtoFiles, proto_path): - """Save the Protobuf schema to a file.""" - for proto in proto_files.files: + #proto.imports.extend([f.package[len(proto.package)+1:] for f in proto_files.files if f.package.startswith(proto.package) and f.package != proto.package]) + proto.imports.extend([d for d in deps if d != proto.package]) + proto_file_path = os.path.join(proto_path, f"{proto.package}.proto") + # create the directory for the proto file if it doesn't exist + proto_dir = os.path.dirname(proto_file_path) + if not os.path.exists(proto_dir): + os.makedirs(proto_dir) + with open(proto_file_path, 'w') as proto_file: + # dump the ProtoFile structure in proto syntax + proto_str = f'syntax = "proto3";\n\n' + proto_str += f'package {proto.package};\n\n' - proto.imports.extend([f.package[len(proto.package)+1:] for f in proto_files.files if f.package.startswith(proto.package) and f.package != proto.package]) - proto_file_path = os.path.join(proto_path, f"{proto.package}.proto") - # create the directory for the proto file if it doesn't exist - proto_dir = os.path.dirname(proto_file_path) - if not os.path.exists(proto_dir): - os.makedirs(proto_dir) - with open(proto_file_path, 'w') as proto_file: - # dump the ProtoFile structure in proto syntax - proto_str = f'syntax = "proto3";\n\n' - proto_str += f'package {proto.package};\n\n' + for import_package in proto.imports: + proto_str += f"import \"{import_package}.proto\";\n" + if (len(proto.imports)): + proto_str += "\n" + for enum_name, enum in proto.enums.items(): + proto_str += f"enum {enum_name} {{\n" + for _, field in enum.fields.items(): + proto_str += f"{indent}{field.name} = {field.number};\n" + proto_str += "}\n\n" + for message in proto.messages.values(): + proto_str += self.render_message(message) + for service in proto.services.values(): + proto_str += f"service {service.name} {{\n" + for function_name, func in service.functions.items(): + proto_str += f"{indent}rpc {func.name} ({func.in_type}) returns ({func.out_type}) {{\n" + proto_str += f"{indent}{indent}option (google.api.http) = {{\n" + proto_str += f"{indent}{indent}{indent}post: \"{func.uri}\"\n" + proto_str += f"{indent}{indent}}};\n" + proto_str += f"{indent}}};\n" + proto_str += "}\n\n" + proto_file.write(proto_str) - for import_package in proto.imports: - proto_str += f"import \"{proto.package}.{import_package}.proto\";\n" - if (len(proto.imports)): - proto_str += "\n" - for enum in proto.enums.values(): - proto_str += f"enum {enum.name} {{\n" - for field_name, val in enum.fields.items(): - proto_str += f"{indent}{field_name} = {val};\n" - proto_str += "}\n\n" - for message in proto.messages.values(): - proto_str += render_message(message) - for service in proto.services.values(): - proto_str += f"service {service.name} {{\n" - for function_name, func in service.functions.items(): - proto_str += f"{indent}rpc {func.name} ({func.in_type}) returns ({func.out_type}) {{\n" - proto_str += f"{indent}{indent}option (google.api.http) = {{\n" - proto_str += f"{indent}{indent}{indent}post: \"{func.uri}\"\n" - proto_str += f"{indent}{indent}}};\n" - proto_str += f"{indent}}};\n" - proto_str += "}\n\n" - proto_file.write(proto_str) - -def render_message(message, level=0) -> str: - proto_str = f"{indent*level}message {message.name} {{\n" - for field in message.fields: - proto_str += f"{indent*level}{indent}{field.type} {field.name} = {field.number};\n" - for oneof in message.oneofs: - proto_str += f"{indent*level}{indent}oneof {oneof.name} {{\n" - for field in oneof.fields: - proto_str += f"{indent*level}{indent}{indent}{field.type} {field.name} = {field.number};\n" - proto_str += f"{indent*level}{indent}}}\n" - for enum in message.enums.values(): - proto_str += f"{indent*level}{indent}enum {enum.name} {{\n" - for field_name, val in enum.fields.items(): - proto_str += f"{indent*level}{indent}{indent}{field_name} = {val};\n" - proto_str += f"{indent*level}{indent}}}\n" - for local_message in message.messages.values(): - proto_str += render_message(local_message, level+1) - proto_str += f"{indent*level}}}\n\n" - return proto_str - + def render_message(self, message, level=0) -> str: + proto_str = f"{indent*level}message {message.name} {{\n" + fieldsAndOneofs = message.fields+message.oneofs + fieldsAndOneofs.sort(key=lambda f: f.number if isinstance(f, Field) else f.fields[0].number) + for fo in fieldsAndOneofs: + if isinstance(fo, Field): + field = fo + if field.type == "map": + proto_str += f"{indent*level}{indent}{field.label}{' ' if field.label else ''}map<{field.key_type}, {field.val_type}> {field.name} = {field.number};\n" + elif field.type == "array": + proto_str += f"{indent*level}{indent}{field.label}{' ' if field.label else ''}{field.val_type} {field.name} = {field.number};\n" + else: + proto_str += f"{indent*level}{indent}{field.label}{' ' if field.label else ''}{field.type} {field.name} = {field.number};\n" + else: + oneof = fo + proto_str += f"{indent*level}{indent}oneof {oneof.name} {{\n" + for field in oneof.fields: + proto_str += f"{indent*level}{indent}{indent}{field.label}{' ' if field.label else ''}{field.type} {field.name} = {field.number};\n" + proto_str += f"{indent*level}{indent}}}\n" + for enum in message.enums.values(): + proto_str += f"{indent*level}{indent}enum {enum.name} {{\n" + for _, field in enum.fields.items(): + proto_str += f"{indent*level}{indent}{indent}{field.label}{' ' if field.label else ''}{field.name} = {field.number};\n" + proto_str += f"{indent*level}{indent}}}\n" + for local_message in message.messages.values(): + proto_str += self.render_message(local_message, level+1) + proto_str += f"{indent*level}}}\n" + return proto_str + -def convert_avro_to_proto(avro_schema_path, proto_file_path): - """Convert Avro schema file to Protobuf .proto file.""" - with open(avro_schema_path, 'r') as avro_file: - avro_schema = json.load(avro_file) - proto_files = ProtoFiles([]) - avro_schema_to_proto_messages(avro_schema, proto_files) - save_proto_to_file(proto_files, proto_file_path) + def convert_avro_to_proto(self, avro_schema_path, proto_file_path): + """Convert Avro schema file to Protobuf .proto file.""" + with open(avro_schema_path, 'r') as avro_file: + avro_schema = json.load(avro_file) + proto_files = ProtoFiles([]) + self.avro_schema_to_proto_messages(avro_schema, proto_files) + self.save_proto_to_file(proto_files, proto_file_path) +def convert_avro_to_proto(avro_schema_path, proto_file_path, naming_mode: Literal['snake', 'pascal', 'camel'] = 'pascal', allow_optional: bool = False): + avrotoproto = AvroToProto() + avrotoproto.naming_mode = naming_mode + avrotoproto.allow_optional = allow_optional + avrotoproto.convert_avro_to_proto(avro_schema_path, proto_file_path) \ No newline at end of file diff --git a/avrotize/jsonstoavro.py b/avrotize/jsonstoavro.py index 9e68c78..d95b742 100644 --- a/avrotize/jsonstoavro.py +++ b/avrotize/jsonstoavro.py @@ -1,6 +1,7 @@ import json import os import re +from typing import Any, Dict, List import jsonpointer import requests import copy @@ -32,9 +33,9 @@ def is_empty_type(self, avro_type): return True return False - def merge_schemas(self, schemas_arg: list, avro_schemas: list, type_name: str = None) -> str | list | dict: + def merge_schemas(self, schemas_arg: list, avro_schemas: list, type_name: str | None = None) -> str | list | dict: """Merge multiple Avro type schemas into one.""" - merged_schema = {} + merged_schema: Dict[str, list, dict] = {} schemas = [] for schema_entry in schemas_arg: if isinstance(schema_entry, list): @@ -62,7 +63,7 @@ def merge_schemas(self, schemas_arg: list, avro_schemas: list, type_name: str = for schema in schemas: if 'dependencies' in schema: - deps = merged_schema.get('dependencies', []) + deps: List[str] = merged_schema.get('dependencies', []) deps.extend(schema['dependencies']) merged_schema['dependencies'] = deps if (isinstance(schema, list) or isinstance(schema, dict)) and len(schema) == 0: @@ -97,8 +98,8 @@ def merge_schemas(self, schemas_arg: list, avro_schemas: list, type_name: str = return merged_schema - def generic_type(self) -> dict: - simple_type_union = ["null", "boolean", "int", "long", "float", "double", "bytes", "string"] + def generic_type(self) -> list[str | dict]: + simple_type_union: list[str | dict] = ["null", "boolean", "int", "long", "float", "double", "bytes", "string"] l2 = simple_type_union.copy() l2.extend([ { @@ -121,7 +122,7 @@ def generic_type(self) -> dict: }]) return l1 - def ensure_type(self, type: dict | str | list) -> dict | str: + def ensure_type(self, type: dict | str | list) -> dict | str | list: if isinstance(type, str) or isinstance(type, list) or 'type' in type: return type @@ -129,7 +130,7 @@ def ensure_type(self, type: dict | str | list) -> dict | str: return type - def json_schema_primitive_to_avro_type(self, json_primitive: str | list, format: str, enum: list, field_name: str, dependencies: list) -> str: + def json_schema_primitive_to_avro_type(self, json_primitive: str | list, format: str | None, enum: list | None, field_name: str, dependencies: list) -> str | dict[str,Any] | list: """Convert a JSON-schema primitive type to Avro primitive type.""" if isinstance(json_primitive, list): @@ -173,8 +174,6 @@ def json_schema_primitive_to_avro_type(self, json_primitive: str | list, format: avro_primitive = {"type": "enum", "symbols": enum, "name": self.avro_name(field_name + "_enum")} else: avro_primitive = "string" - - return avro_primitive @@ -244,10 +243,10 @@ def resolve_reference(self, json_type: dict, base_uri: str, json_doc: dict): - def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_name: str, namespace : str, dependencies: list, json_schema: dict, base_uri: str, avro_schema: list, record_stack: list) -> dict: + def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_name: str, namespace : str, dependencies: list, json_schema: dict, base_uri: str, avro_schema: list, record_stack: list) -> dict | list | str: """Convert a JSON type to Avro type.""" - avro_type = {} + avro_type: list | dict | str = {} local_name = self.avro_name(field_name if field_name else record_name) qualified_name = namespace + "." + local_name @@ -311,7 +310,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ if len(json_types) > 0: if len(json_types) == 1: avro_type = self.json_type_to_avro_type(json_types[0], record_name, field_name, namespace, dependencies, json_schema, base_uri, avro_schema, record_stack) - if self.is_empty_type(avro_type): + if isinstance(avro_type, dict) and self.is_empty_type(avro_type): avro_type['type'] = self.generic_type() return avro_type else: @@ -338,10 +337,10 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ if 'properties' in json_type and not 'type' in json_type: json_type['type'] = 'object' - if 'description' in json_type: + if 'description' in json_type and isinstance(avro_type, dict): avro_type['doc'] = json_type['description'] - if 'title' in json_type: + if 'title' in json_type and isinstance(avro_type, dict): avro_type['name'] = self.avro_name(json_type['title']) # first, pull in any referenced definitions and merge with this schema @@ -369,7 +368,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ # registering in imported_types ahead of resolving to prevent circular references self.imported_types[ref] = type_name # resolve type - deps = [] + deps: List[str] = [] avro_type = self.json_type_to_avro_type(resolved_json_type, type_name, field_name, type_namespace, deps, json_schema, new_base_uri, avro_schema, record_stack) if isinstance(avro_type, list) or (not isinstance(avro_type, dict) or not avro_type.get('type') == "record"): if isinstance(avro_type, dict) and not 'type' in avro_type: @@ -404,7 +403,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ json_type.update(resolved_json_type) del json_type['$ref'] avro_type = self.json_type_to_avro_type(json_type, record_name, field_name, namespace, dependencies, json_schema, new_base_uri, avro_schema, record_stack) - if 'name' in json_type: + if isinstance(avro_type, dict) and 'name' in avro_type: self.imported_types[ref] = avro_type['name'] else: self.imported_types[ref] = avro_type @@ -412,7 +411,7 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ # if 'const' is present, make this an enum if 'const' in json_type: const = json_type['const'] - avro_type = self.merge_schemas([avro_type, {"type": "enum", "symbols": [const], "name": self.avro_name(avro_type.get('name', local_name)) }], avro_schema, avro_type.get('name', local_name)) + avro_type = self.merge_schemas([avro_type, {"type": "enum", "symbols": [const], "name": self.avro_name(local_name)}], avro_schema, local_name) json_object_type = json_type.get('type') if json_object_type: @@ -424,14 +423,16 @@ def json_type_to_avro_type(self, json_type: str | dict, record_name: str, field_ elif json_object_type == 'object' or 'object' in json_object_type: record_stack.append(record_name) avro_type = self.merge_schemas([avro_type, self.json_schema_object_to_avro_record(local_name, json_type, namespace, json_schema, base_uri, avro_schema, record_stack)], avro_schema, avro_type.get('name', local_name)) - if 'dependencies' in avro_type: + if isinstance(avro_type, dict) and 'dependencies' in avro_type: dependencies.extend(avro_type['dependencies']) del avro_type['dependencies'] record_stack.pop() else: + local_name = field_name = field_name if field_name else local_name+"_value" avro_type = self.json_schema_primitive_to_avro_type(json_object_type, json_type.get('format'), json_type.get('enum'), field_name, dependencies) elif 'enum' in json_type: - enum_namespace = namespace + '.' + record_stack[-1] + field_name + local_name = field_name = field_name if field_name else local_name + enum_namespace = namespace + '.' + record_stack[-1] + field_name avro_type = self.merge_schemas([avro_type,self.json_schema_primitive_to_avro_type("string", json_type.get('format'), json_type.get('enum'), field_name, dependencies)], avro_schema, avro_type.get('name', local_name)) if isinstance(avro_type, dict): avro_type['namespace'] = enum_namespace