Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dependencies/references support test branch #28

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ __pycache__/
/kafka_*/
venv
/karapace/version.py
.run
.python-version
57 changes: 57 additions & 0 deletions karapace/dependency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
karapace - dependency

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from karapace.schema_references import Reference
from karapace.typing import JsonData, Subject, Version
from typing import Any, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from karapace.schema_models import ValidatedTypedSchema


class DependencyVerifierResult:
def __init__(self, result: bool, message: Optional[str] = "") -> None:
self.result = result
self.message = message


class Dependency:
def __init__(self, name: str, subject: Subject, version: Version, target_schema: "ValidatedTypedSchema") -> None:
self.name = name
self.subject = subject
self.version = version
self.schema = target_schema

def get_schema(self) -> "ValidatedTypedSchema":
return self.schema

@staticmethod
def of(reference: Reference, target_schema: "ValidatedTypedSchema") -> "Dependency":
return Dependency(reference.name, reference.subject, reference.version, target_schema)

def to_dict(self) -> JsonData:
return {
"name": self.name,
"subject": self.subject,
"version": self.version,
}

def identifier(self) -> str:
return self.name + "_" + self.subject + "_" + str(self.version)

def __hash__(self) -> int:
return hash((self.name, self.subject, self.version, self.schema))

def __eq__(self, other: Any) -> bool:
if other is None or not isinstance(other, Dependency):
return False
return (
self.name == other.name
and self.subject == other.subject
and self.version == other.version
and self.schema == other.schema
)
17 changes: 17 additions & 0 deletions karapace/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from karapace.schema_references import Referents
from karapace.typing import Version


class VersionNotFoundException(Exception):
Expand All @@ -20,10 +22,18 @@ class InvalidSchema(Exception):
pass


class InvalidTest(Exception):
pass


class InvalidSchemaType(Exception):
pass


class InvalidReferences(Exception):
pass


class SchemasNotFoundException(Exception):
pass

Expand All @@ -44,6 +54,13 @@ class SubjectNotSoftDeletedException(Exception):
pass


class ReferenceExistsException(Exception):
def __init__(self, referenced_by: Referents, version: Version):
super().__init__()
self.version = version
self.referenced_by = referenced_by


class SubjectSoftDeletedException(Exception):
pass

Expand Down
31 changes: 30 additions & 1 deletion karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
from dataclasses import dataclass, field
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_references import Reference, Referents
from karapace.typing import ResolvedVersion, SchemaId, Subject
from threading import Lock, RLock
from typing import Dict, List, Optional, Tuple
Expand All @@ -28,6 +29,7 @@ def __init__(self) -> None:
self.subjects: Dict[Subject, SubjectData] = {}
self.schemas: Dict[SchemaId, TypedSchema] = {}
self.schema_lock_thread = RLock()
self.referenced_by: Dict[Tuple[Subject, ResolvedVersion], Referents] = {}

# Content based deduplication of schemas. This is used to reduce memory
# usage when the same schema is produce multiple times to the same or
Expand Down Expand Up @@ -96,7 +98,14 @@ def get_next_version(self, *, subject: Subject) -> ResolvedVersion:
return max(self.subjects[subject].schemas) + 1

def insert_schema_version(
self, *, subject: Subject, schema_id: SchemaId, version: ResolvedVersion, deleted: bool, schema: TypedSchema
self,
*,
subject: Subject,
schema_id: SchemaId,
version: ResolvedVersion,
deleted: bool,
schema: TypedSchema,
references: List[Reference],
) -> None:
with self.schema_lock_thread:
self.global_schema_id = max(self.global_schema_id, schema_id)
Expand All @@ -119,6 +128,7 @@ def insert_schema_version(
deleted=deleted,
schema_id=schema_id,
schema=schema,
references=references,
)

if not deleted:
Expand Down Expand Up @@ -235,3 +245,22 @@ def num_schema_versions(self) -> Tuple[int, int]:
else:
soft_deleted_versions += 1
return (live_versions, soft_deleted_versions)

def insert_referenced_by(self, *, subject: Subject, version: ResolvedVersion, schema_id: SchemaId) -> None:
with self.schema_lock_thread:
referents = self.referenced_by.get((subject, version), None)
if referents:
referents.append(schema_id)
else:
self.referenced_by[(subject, version)] = [schema_id]

def get_referenced_by(self, subject: Subject, version: ResolvedVersion) -> Optional[Referents]:
with self.schema_lock_thread:
return self.referenced_by.get((subject, version), None)

def remove_referenced_by(self, schema_id: SchemaId, references: List[Reference]) -> None:
with self.schema_lock_thread:
for ref in references:
key = (ref.subject, ref.version)
if self.referenced_by.get(key, None) and schema_id in self.referenced_by[key]:
self.referenced_by[key].remove(schema_id)
73 changes: 73 additions & 0 deletions karapace/protobuf/compare_type_lists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
karapace - compare_type_lists

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from itertools import chain
from karapace.protobuf.compare_result import CompareResult, Modification
from karapace.protobuf.compare_type_storage import CompareTypes
from karapace.protobuf.enum_element import EnumElement
from karapace.protobuf.exception import IllegalStateException
from karapace.protobuf.message_element import MessageElement
from karapace.protobuf.type_element import TypeElement
from typing import List


def compare_type_lists(
self_types_list: List[TypeElement],
other_types_list: List[TypeElement],
result: CompareResult,
compare_types: CompareTypes,
) -> CompareResult:
self_types = {}
other_types = {}
self_indexes = {}
other_indexes = {}

type_: TypeElement
for i, type_ in enumerate(self_types_list):
self_types[type_.name] = type_
self_indexes[type_.name] = i
compare_types.add_self_type(compare_types.self_package_name, type_)

for i, type_ in enumerate(other_types_list):
other_types[type_.name] = type_
other_indexes[type_.name] = i
compare_types.add_other_type(compare_types.other_package_name, type_)

for name in chain(self_types.keys(), other_types.keys() - self_types.keys()):
result.push_path(str(name), True)

if self_types.get(name) is None and other_types.get(name) is not None:
if isinstance(other_types[name], MessageElement):
result.add_modification(Modification.MESSAGE_ADD)
elif isinstance(other_types[name], EnumElement):
result.add_modification(Modification.ENUM_ADD)
else:
raise IllegalStateException("Instance of element is not applicable")
elif self_types.get(name) is not None and other_types.get(name) is None:
if isinstance(self_types[name], MessageElement):
result.add_modification(Modification.MESSAGE_DROP)
elif isinstance(self_types[name], EnumElement):
result.add_modification(Modification.ENUM_DROP)
else:
raise IllegalStateException("Instance of element is not applicable")
else:
if other_indexes[name] != self_indexes[name]:
if isinstance(self_types[name], MessageElement):
# incompatible type
result.add_modification(Modification.MESSAGE_MOVE)
else:
raise IllegalStateException("Instance of element is not applicable")
else:
if isinstance(self_types[name], MessageElement) and isinstance(other_types[name], MessageElement):
self_types[name].compare(other_types[name], result, compare_types)
elif isinstance(self_types[name], EnumElement) and isinstance(other_types[name], EnumElement):
self_types[name].compare(other_types[name], result, compare_types)
else:
# incompatible type
result.add_modification(Modification.TYPE_ALTER)
result.pop_path(True)

return result
19 changes: 13 additions & 6 deletions karapace/protobuf/compare_type_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ def compute_name(t: ProtoType, result_path: List[str], package_name: str, types:

class CompareTypes:
def __init__(self, self_package_name: str, other_package_name: str, result: CompareResult) -> None:
self.self_package_name = self_package_name
self.other_package_name = other_package_name
self.self_package_name = self_package_name or ""
self.other_package_name = other_package_name or ""

self.self_types: Dict[str, Union[TypeRecord, TypeRecordMap]] = {}
self.other_types: Dict[str, Union[TypeRecord, TypeRecordMap]] = {}
self.locked_messages: List["MessageElement"] = []
Expand Down Expand Up @@ -93,17 +94,23 @@ def self_type_short_name(self, t: ProtoType) -> Optional[str]:
if name is None:
raise IllegalArgumentException(f"Cannot determine message type {t}")
type_record: TypeRecord = self.self_types.get(name)
if name.startswith(type_record.package_name):
return name[(len(type_record.package_name) + 1) :]
package_name = type_record.package_name
if package_name is None:
return name
if name.startswith(package_name):
return name[(len(package_name) + 1) :]
return name

def other_type_short_name(self, t: ProtoType) -> Optional[str]:
name = compute_name(t, self.result.path, self.other_package_name, self.other_types)
if name is None:
raise IllegalArgumentException(f"Cannot determine message type {t}")
type_record: TypeRecord = self.other_types.get(name)
if name.startswith(type_record.package_name):
return name[(len(type_record.package_name) + 1) :]
package_name = type_record.package_name
if package_name is None:
return name
if name.startswith(package_name):
return name[(len(package_name) + 1) :]
return name

def lock_message(self, message: "MessageElement") -> bool:
Expand Down
63 changes: 63 additions & 0 deletions karapace/protobuf/dependency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
karapace - dependency

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from karapace.dependency import DependencyVerifierResult
from karapace.protobuf.known_dependency import DependenciesHardcoded, KnownDependency
from karapace.protobuf.one_of_element import OneOfElement
from typing import List


class ProtobufDependencyVerifier:
def __init__(self) -> None:
self.declared_types: List[str] = []
self.used_types: List[str] = []
self.import_path: List[str] = []

def add_declared_type(self, full_name: str) -> None:
self.declared_types.append(full_name)

def add_used_type(self, parent: str, element_type: str) -> None:
if element_type.find("map<") == 0:
end = element_type.find(">")
virgule = element_type.find(",")
key = element_type[4:virgule]
value = element_type[virgule + 1 : end]
value = value.strip()
self.used_types.append(parent + ";" + key)
self.used_types.append(parent + ";" + value)
else:
self.used_types.append(parent + ";" + element_type)

def add_import(self, import_name: str) -> None:
self.import_path.append(import_name)

def verify(self) -> DependencyVerifierResult:
declared_index = set(self.declared_types)
for used_type in self.used_types:
delimiter = used_type.rfind(";")
used_type_with_scope = ""
if delimiter != -1:
used_type_with_scope = used_type[:delimiter] + "." + used_type[delimiter + 1 :]
used_type = used_type[delimiter + 1 :]

if not (
used_type in DependenciesHardcoded.index
or KnownDependency.index_simple.get(used_type) is not None
or KnownDependency.index.get(used_type) is not None
or used_type in declared_index
or (delimiter != -1 and used_type_with_scope in declared_index)
or "." + used_type in declared_index
):
return DependencyVerifierResult(False, f"type {used_type} is not defined")

return DependencyVerifierResult(True)


def process_one_of(verifier: ProtobufDependencyVerifier, package_name: str, parent_name: str, one_of: OneOfElement) -> None:
parent = package_name + "." + parent_name
for field in one_of.fields:
verifier.add_used_type(parent, field.element_type)
8 changes: 4 additions & 4 deletions karapace/protobuf/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
import json


class ProtobufParserRuntimeException(Exception):
pass


class IllegalStateException(Exception):
pass

Expand All @@ -29,6 +25,10 @@ class ProtobufTypeException(Error):
"""Generic Protobuf type error."""


class ProtobufUnresolvedDependencyException(ProtobufException):
"""a Protobuf schema has unresolved dependency"""


class SchemaParseException(ProtobufException):
"""Error while parsing a Protobuf schema descriptor."""

Expand Down
7 changes: 7 additions & 0 deletions karapace/protobuf/field_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,17 @@ def compare_message(

self_type_record = types.get_self_type(self_type)
other_type_record = types.get_other_type(other_type)

self_type_element: MessageElement = self_type_record.type_element
other_type_element: MessageElement = other_type_record.type_element

if types.self_type_short_name(self_type) != types.other_type_short_name(other_type):
result.add_modification(Modification.FIELD_NAME_ALTER)
else:
self_type_element.compare(other_type_element, result, types)

def __repr__(self):
return f"{self.element_type} {self.name} = {self.tag}"

def __str__(self):
return f"{self.element_type} {self.name} = {self.tag}"
Loading