Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pr3 merge test #29

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a30bdf7
Add support of references/dependencies
libretto Oct 5, 2022
7c71af4
add support of dependencies to compatibility check
libretto Oct 7, 2022
c85ea89
add few tests for workaround
libretto Oct 18, 2022
62c3b0f
Merge branch 'pr3x' into pr2_clear
libretto Oct 18, 2022
bdb677c
fix bugs and add more tests
libretto Oct 18, 2022
492fe93
Add support of references/dependencies
libretto Oct 5, 2022
d3a5c3b
Merge branch 'instaclustr-pr2_clear' into protobuf-schema-references-…
jjaakola-aiven Oct 20, 2022
7c36fe0
WIP: deleted proto files
jjaakola-aiven Oct 18, 2022
984d794
WIP: Add tests and fixes for protobuf references support
jjaakola-aiven Oct 20, 2022
0c2b5f0
WIP: fix schema test assumed error message
jjaakola-aiven Oct 21, 2022
20d5235
WIP: SchemaRegistryClient to load references
jjaakola-aiven Oct 21, 2022
2786858
fix schema deletion when references used
jjaakola-aiven Oct 21, 2022
437f7bd
bugfixes
libretto Oct 21, 2022
856b720
Merge branch 'aiven:main' into pr2_clear
libretto Oct 21, 2022
d524e0f
remove head dots from known_dependency.py full type name
libretto Oct 21, 2022
0fdb7e1
Merge remote-tracking branch 'refs/remotes/origin/pr2_clear' into pr2…
libretto Oct 21, 2022
6201bef
sync with aiven references branch
libretto Oct 31, 2022
ecb22b9
fixup bugs
libretto Nov 1, 2022
05d5588
Merge branch 'protobuf-schema-references-support' into pr2_clear
libretto Nov 1, 2022
b2a796d
fixup
libretto Nov 3, 2022
5bf9261
Merge branch 'pr2_clear' of https://github.com/instaclustr/karapace i…
libretto Nov 3, 2022
d82c25b
apply compatibility check with depependencies on top of jjaakola chan…
libretto Nov 9, 2022
2977a2d
add references support to serialization/deserialization with tests
libretto Dec 5, 2022
a9c8fec
fixup repetetive generation of .proto files for serialization and add…
libretto Dec 5, 2022
e4a3eb8
merge with master
Mar 1, 2023
21d936b
Merge remote-tracking branch 'origin' into pr3_merge
Mar 1, 2023
6bfca8f
Merge branch 'master' into pr3_merge
Mar 2, 2023
7aebbe0
merge workaround
libretto Mar 2, 2023
d0ded16
fixup merge issues
libretto Mar 2, 2023
3d5a215
debugg workaround
libretto Mar 8, 2023
afbff90
debuggina
libretto Mar 13, 2023
e8888b5
merge with master
libretto Mar 13, 2023
a307d26
debug
libretto Mar 14, 2023
73122df
fixup merge issue
libretto Mar 14, 2023
7d6bdf8
python 3.9 module requirement
libretto Mar 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ __pycache__/
/kafka_*/
venv
/karapace/version.py
.run
50 changes: 50 additions & 0 deletions karapace/dependency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from karapace.schema_references import Reference
from karapace.typing import JsonData, Subject, Version
from typing import Any, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from karapace.schema_models import ValidatedTypedSchema


class DependencyVerifierResult:
def __init__(self, result: bool, message: Optional[str] = "") -> None:
self.result = result
self.message = message


class Dependency:
def __init__(self, name: str, subject: Subject, version: Version, target_schema: "ValidatedTypedSchema") -> None:
self.name = name
self.subject = subject
self.version = version
self.schema = target_schema

def get_schema(self) -> "ValidatedTypedSchema":
return self.schema

@staticmethod
def of(reference: Reference, target_schema: "ValidatedTypedSchema") -> "Dependency":
return Dependency(reference.name, reference.subject, reference.version, target_schema)

def to_dict(self) -> JsonData:
return {
"name": self.name,
"subject": self.subject,
"version": self.version,
}

def identifier(self) -> str:
return self.name + "_" + self.subject + "_" + str(self.version)

def __hash__(self) -> int:
return hash((self.name, self.subject, self.version, self.schema))

def __eq__(self, other: Any) -> bool:
if other is None or not isinstance(other, Dependency):
return False
return (
self.name == other.name
and self.subject == other.subject
and self.version == other.version
and self.schema == other.schema
)
13 changes: 13 additions & 0 deletions karapace/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from karapace.typing import Version
from typing import List


class VersionNotFoundException(Exception):
Expand All @@ -24,6 +26,10 @@ class InvalidSchemaType(Exception):
pass


class InvalidReferences(Exception):
pass


class SchemasNotFoundException(Exception):
pass

Expand All @@ -44,6 +50,13 @@ class SubjectNotSoftDeletedException(Exception):
pass


class ReferenceExistsException(Exception):
def __init__(self, referenced_by: List, version: Version):
super().__init__()
self.version = version
self.referenced_by = referenced_by


class SubjectSoftDeletedException(Exception):
pass

Expand Down
46 changes: 45 additions & 1 deletion karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"""
from dataclasses import dataclass, field
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_references import Reference, Referents
from karapace.typing import ResolvedVersion, SchemaId, Subject
from karapace.utils import reference_key
from threading import Lock, RLock
from typing import Dict, List, Optional, Tuple

Expand All @@ -28,6 +30,7 @@ def __init__(self) -> None:
self.subjects: Dict[Subject, SubjectData] = {}
self.schemas: Dict[SchemaId, TypedSchema] = {}
self.schema_lock_thread = RLock()
self.referenced_by: Dict[str, Referents] = {}

# Content based deduplication of schemas. This is used to reduce memory
# usage when the same schema is produce multiple times to the same or
Expand Down Expand Up @@ -96,7 +99,14 @@ def get_next_version(self, *, subject: Subject) -> ResolvedVersion:
return max(self.subjects[subject].schemas) + 1

def insert_schema_version(
self, *, subject: Subject, schema_id: SchemaId, version: ResolvedVersion, deleted: bool, schema: TypedSchema
self,
*,
subject: Subject,
schema_id: SchemaId,
version: ResolvedVersion,
deleted: bool,
schema: TypedSchema,
references: List[Reference],
) -> None:
with self.schema_lock_thread:
self.global_schema_id = max(self.global_schema_id, schema_id)
Expand All @@ -119,6 +129,7 @@ def insert_schema_version(
deleted=deleted,
schema_id=schema_id,
schema=schema,
references=references,
)

if not deleted:
Expand Down Expand Up @@ -235,3 +246,36 @@ def num_schema_versions(self) -> Tuple[int, int]:
else:
soft_deleted_versions += 1
return (live_versions, soft_deleted_versions)

def insert_referenced_by(self, *, subject: Subject, version: ResolvedVersion, schema_id: SchemaId) -> None:
with self.schema_lock_thread:
ref_str = reference_key(subject, version)
referents = self.referenced_by.get(ref_str, None)
if referents:
referents.append(schema_id)
else:
self.referenced_by[ref_str] = [schema_id]

def get_referenced_by(self, subject: Subject, version: ResolvedVersion) -> Optional[Referents]:
with self.schema_lock_thread:
ref_str = reference_key(subject, version)
return self.referenced_by.get(ref_str, None)

def remove_referenced_by(self, schema_id: SchemaId, references: List[Reference]) -> None:
with self.schema_lock_thread:
for ref in references:
key = reference_key(ref.subject, ref.version)
if self.referenced_by.get(key, None) and schema_id in self.referenced_by[key]:
self.referenced_by[key].remove(schema_id)

def find_all_schemas(self, *, subject: Subject, include_deleted: bool) -> Dict[ResolvedVersion, SchemaVersion]:
if subject not in self.subjects:
return {}
if include_deleted:
return self.subjects[subject].schemas
with self.schema_lock_thread:
return {
version_id: schema_version
for version_id, schema_version in self.subjects[subject].schemas.items()
if schema_version.deleted is False
}
68 changes: 68 additions & 0 deletions karapace/protobuf/compare_type_lists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from itertools import chain
from karapace.protobuf.compare_result import CompareResult, Modification
from karapace.protobuf.compare_type_storage import CompareTypes
from karapace.protobuf.enum_element import EnumElement
from karapace.protobuf.exception import IllegalStateException
from karapace.protobuf.message_element import MessageElement
from karapace.protobuf.type_element import TypeElement
from typing import List


def compare_type_lists(
self_types_list: List[TypeElement],
other_types_list: List[TypeElement],
result: CompareResult,
compare_types: CompareTypes,
) -> CompareResult:
self_types = {}
other_types = {}
self_indexes = {}
other_indexes = {}

type_: TypeElement
for i, type_ in enumerate(self_types_list):
self_types[type_.name] = type_
self_indexes[type_.name] = i
compare_types.add_self_type(compare_types.self_package_name, type_)

for i, type_ in enumerate(other_types_list):
other_types[type_.name] = type_
other_indexes[type_.name] = i
compare_types.add_other_type(compare_types.other_package_name, type_)

for name in chain(self_types.keys(), other_types.keys() - self_types.keys()):

result.push_path(str(name), True)

if self_types.get(name) is None and other_types.get(name) is not None:
if isinstance(other_types[name], MessageElement):
result.add_modification(Modification.MESSAGE_ADD)
elif isinstance(other_types[name], EnumElement):
result.add_modification(Modification.ENUM_ADD)
else:
raise IllegalStateException("Instance of element is not applicable")
elif self_types.get(name) is not None and other_types.get(name) is None:
if isinstance(self_types[name], MessageElement):
result.add_modification(Modification.MESSAGE_DROP)
elif isinstance(self_types[name], EnumElement):
result.add_modification(Modification.ENUM_DROP)
else:
raise IllegalStateException("Instance of element is not applicable")
else:
if other_indexes[name] != self_indexes[name]:
if isinstance(self_types[name], MessageElement):
# incompatible type
result.add_modification(Modification.MESSAGE_MOVE)
else:
raise IllegalStateException("Instance of element is not applicable")
else:
if isinstance(self_types[name], MessageElement) and isinstance(other_types[name], MessageElement):
self_types[name].compare(other_types[name], result, compare_types)
elif isinstance(self_types[name], EnumElement) and isinstance(other_types[name], EnumElement):
self_types[name].compare(other_types[name], result, compare_types)
else:
# incompatible type
result.add_modification(Modification.TYPE_ALTER)
result.pop_path(True)

return result
20 changes: 14 additions & 6 deletions karapace/protobuf/compare_type_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ def compute_name(t: ProtoType, result_path: List[str], package_name: str, types:
class CompareTypes:
def __init__(self, self_package_name: str, other_package_name: str, result: CompareResult) -> None:

self.self_package_name = self_package_name
self.other_package_name = other_package_name
self.self_package_name = self_package_name or ""
self.other_package_name = other_package_name or ""

self.self_types: Dict[str, Union[TypeRecord, TypeRecordMap]] = {}
self.other_types: Dict[str, Union[TypeRecord, TypeRecordMap]] = {}
self.locked_messages: List["MessageElement"] = []
Expand Down Expand Up @@ -94,17 +95,24 @@ def self_type_short_name(self, t: ProtoType) -> Optional[str]:
if name is None:
raise IllegalArgumentException(f"Cannot determine message type {t}")
type_record: TypeRecord = self.self_types.get(name)
if name.startswith(type_record.package_name):
return name[(len(type_record.package_name) + 1) :]
package_name = type_record.package_name
if package_name is None:
package_name = ""
if name.startswith(package_name):
return name[(len(package_name) + 1) :]

return name

def other_type_short_name(self, t: ProtoType) -> Optional[str]:
name = compute_name(t, self.result.path, self.other_package_name, self.other_types)
if name is None:
raise IllegalArgumentException(f"Cannot determine message type {t}")
type_record: TypeRecord = self.other_types.get(name)
if name.startswith(type_record.package_name):
return name[(len(type_record.package_name) + 1) :]
package_name = type_record.package_name
if package_name is None:
package_name = ""
if name.startswith(package_name):
return name[(len(package_name) + 1) :]
return name

def lock_message(self, message: "MessageElement") -> bool:
Expand Down
56 changes: 56 additions & 0 deletions karapace/protobuf/dependency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from karapace.dependency import DependencyVerifierResult
from karapace.protobuf.known_dependency import DependenciesHardcoded, KnownDependency
from karapace.protobuf.one_of_element import OneOfElement
from typing import List


class ProtobufDependencyVerifier:
def __init__(self) -> None:
self.declared_types: List[str] = []
self.used_types: List[str] = []
self.import_path: List[str] = []

def add_declared_type(self, full_name: str) -> None:
self.declared_types.append(full_name)

def add_used_type(self, parent: str, element_type: str) -> None:
if element_type.find("map<") == 0:
end = element_type.find(">")
virgule = element_type.find(",")
key = element_type[4:virgule]
value = element_type[virgule + 1 : end]
value = value.strip()
self.used_types.append(parent + ";" + key)
self.used_types.append(parent + ";" + value)
else:
self.used_types.append(parent + ";" + element_type)

def add_import(self, import_name: str) -> None:
self.import_path.append(import_name)

def verify(self) -> DependencyVerifierResult:
declared_index = set(self.declared_types)
for used_type in self.used_types:
delimiter = used_type.rfind(";")
used_type_with_scope = ""
if delimiter != -1:
used_type_with_scope = used_type[:delimiter] + "." + used_type[delimiter + 1 :]
used_type = used_type[delimiter + 1 :]

if not (
used_type in DependenciesHardcoded.index
or KnownDependency.index_simple.get(used_type) is not None
or KnownDependency.index.get(used_type) is not None
or used_type in declared_index
or (delimiter != -1 and used_type_with_scope in declared_index)
or "." + used_type in declared_index
):
return DependencyVerifierResult(False, f"type {used_type} is not defined")

return DependencyVerifierResult(True)


def _process_one_of(verifier: ProtobufDependencyVerifier, package_name: str, parent_name: str, one_of: OneOfElement) -> None:
parent = package_name + "." + parent_name
for field in one_of.fields:
verifier.add_used_type(parent, field.element_type)
4 changes: 4 additions & 0 deletions karapace/protobuf/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class ProtobufTypeException(Error):
"""Generic Protobuf type error."""


class ProtobufUnresolvedDependencyException(ProtobufException):
"""a Protobuf schema has unresolved dependency"""


class SchemaParseException(ProtobufException):
"""Error while parsing a Protobuf schema descriptor."""

Expand Down
7 changes: 7 additions & 0 deletions karapace/protobuf/field_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,17 @@ def compare_message(

self_type_record = types.get_self_type(self_type)
other_type_record = types.get_other_type(other_type)

self_type_element: MessageElement = self_type_record.type_element
other_type_element: MessageElement = other_type_record.type_element

if types.self_type_short_name(self_type) != types.other_type_short_name(other_type):
result.add_modification(Modification.FIELD_NAME_ALTER)
else:
self_type_element.compare(other_type_element, result, types)

def __repr__(self):
return f"{self.element_type} {self.name} = {self.tag}"

def __str__(self):
return f"{self.element_type} {self.name} = {self.tag}"
Loading