Skip to content

Commit

Permalink
wrapper class for top-level arrays
Browse files Browse the repository at this point in the history
Signed-off-by: Clemens Vasters <clemens@vasters.com>
  • Loading branch information
clemensv committed May 14, 2024
1 parent f5c5dab commit 941a089
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 7 deletions.
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ Converting to Avro Schema:
- [`avrotize j2a`](#convert-json-schema-to-avro-schema) - Convert JSON schema to Avro schema.
- [`avrotize x2a`](#convert-xml-schema-xsd-to-avro-schema) - Convert XML schema to Avro schema.
- [`avrotize asn2a`](#convert-asn1-schema-to-avro-schema) - Convert ASN.1 to Avro schema.
- ['avrotize k2a'](#convert-kusto-table-definition-to-avro-schema) - Convert Kusto table definitions to Avro schema.

Converting from Avro Schema:

Expand Down Expand Up @@ -337,6 +338,55 @@ Conversion notes:
to parse all ASN.1 files. The tool is based on the Python asn1tools package and
is limited to that package's capabilities.

### Convert Kusto table definition to Avro schema

```bash
avrotize k2a --kusto-uri <kusto_cluster_uri> --kusto-database <kusto_database> --avsc <path_to_avro_schema_file> --emit-cloudevents-xregistry
```

Parameters:

- `--kusto-uri`: The URI of the Kusto cluster to connect to.
- `--kusto-database`: The name of the Kusto database to read the table definitions from.
- `--avsc`: The path to the Avro schema file to write the conversion result to.
- `--emit-cloudevents-xregistry`: (optional) See discussion below.

Conversion notes:
- The tool directly connects to the Kusto cluster and reads the table
definitions from the specified database. The tool will convert all tables in
the database to Avro record types, returned in a top-level type union.
- Connecting to the Kusto cluster leans on the same authentication mechanisms as
the Azure CLI. The tool will use the same authentication context as the Azure CLI
if it is installed and authenticated.
- The tool will map the Kusto column types to Avro types as follows:
- `bool` is mapped to Avro boolean type.
- `datetime` is mapped to Avro long type with logical type `timestamp-millis`.
- `decimal` is mapped to a logical Avro type with the `logicalType` set to `decimal`
and the `precision` and `scale` set to the values of the `decimal` type in Kusto.
- `guid` is mapped to Avro string type.
- `int` is mapped to Avro int type.
- `long` is mapped to Avro long type.
- `real` is mapped to Avro double type.
- `string` is mapped to Avro string type.
- `timespan` is mapped to a logical Avro type with the `logicalType` set to
`duration`.
- For `dynamic` columns, the tool will sample the data in the table to determine
the structure of the dynamic column. The tool will map the dynamic column to an
Avro record type with fields that correspond to the fields found in the dynamic
column. If the dynamic column contains nested dynamic columns, the tool will
recursively map those to Avro record types. If records with conflicting
structures are found in the dynamic column, the tool will emit a union of record
types for the dynamic column.
- If the `--emit-cloudevents-xregistry' option is set, the tool will emit an
[xRegistry](http://xregistry.io) registry manifest file with a CloudEvent
message definition for each table in the Kusto database and a separate Avro
Schema for each table in the embedded schema registry. If one or more tables
are found to contain CloudEvent data (as indicated by the presence of the
CloudEvents attribute columns), the tool will inspect the content of the
`type` (or `__type` or `__type`) columns to determine which CloudEvent types
have been stored in the table and will emit a CloudEvent definition and schema
for each unique type.

### Convert Avro schema to Kusto table declaration

```bash
Expand Down
38 changes: 32 additions & 6 deletions avrotize/kustotoavro.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from avrotize.common import get_tree_hash
from avrotize.constants import AVRO_VERSION

JsonNode = Dict[str, 'JsonNode'] | List['JsonNode'] | str | None
JsonNode = Dict[str, 'JsonNode'] | List['JsonNode'] | str | bool | None


class KustoToAvro:
Expand Down Expand Up @@ -188,10 +188,14 @@ def map_kusto_type_to_avro_type(self, kusto_type, table_name, column_name, type_
if kusto_type == "dynamic":
return self.infer_dynamic_schema(table_name, column_name, type_column, type_value)
return str({
"int": "int", "long": "long", "string": "string",
"real": "double", "bool": "boolean",
"datetime": "string", "timespan": "string",
"decimal": {"type": "bytes", "logicalType": "decimal"},
"int": "int",
"long": "long",
"string": "string",
"real": "double",
"bool": "boolean",
"datetime": {"type": "long", "logicalType": "timestamp-millis"},
"timespan": {"type": "fixed", "size": 12, "logicalType": "duration"},
"decimal": {"type": "fixed", "size": 16, "precision": 38, "logicalType": "decimal"},
"dynamic": "bytes"
}.get(kusto_type, "string"))

Expand Down Expand Up @@ -237,6 +241,8 @@ def kusto_to_avro_schema(self, kusto_schema: dict, table_name: str) -> JsonNode:
data_schemas = [data_schemas]
if isinstance(data_schemas, list):
for schema in data_schemas:
if not isinstance(schema, dict) or "type" not in schema or schema["type"] != "record":
schema = self.wrap_schema_in_root_record(schema, type_name_name, type_namespace)
ce_attribs: Dict[str, JsonNode] ={}
for col in [col for col in kusto_schema['OrderedColumns'] if col['Name'].lstrip('_') != 'data']:
ce_attribs[col['Name'].lstrip('_')] = "string"
Expand All @@ -248,6 +254,8 @@ def kusto_to_avro_schema(self, kusto_schema: dict, table_name: str) -> JsonNode:
for column in kusto_schema['OrderedColumns']:
avro_type = self.map_kusto_type_to_avro_type(
column['CslType'], table_name, column['Name'], type_column, type_value)
if not isinstance(avro_type, dict) or "type" not in avro_type or avro_type["type"] != "record":
avro_type = self.wrap_schema_in_root_record(avro_type, type_name_name, type_namespace)
field: Dict[str, JsonNode] = {"name": column['Name'], "type": avro_type}
doc: JsonNode = column.get('DocString', '')
if doc:
Expand All @@ -263,6 +271,24 @@ def kusto_to_avro_schema(self, kusto_schema: dict, table_name: str) -> JsonNode:

return schemas if len(schemas) > 1 else schemas[0]


def wrap_schema_in_root_record(self, schema: JsonNode, type_name: str, type_namespace: str):
""" Wraps a schema in a root record."""
record: Dict[str, JsonNode] = {
"type": "record",
"name": type_name,
"fields": [
{
"name": "data",
"type": schema,
"root": True
}
]
}
if type_namespace:
record["namespace"] = type_namespace
return record

def apply_schema_attributes(self, schema, kusto_schema, table_name, type_value, type_namespace):
""" Applies schema attributes to the schema."""
if isinstance(schema, dict):
Expand Down Expand Up @@ -372,7 +398,7 @@ def process_all_tables(self):
continue
xregistry_messages[schemaid]["metadata"][key] = {
"type": value,
"required": "true"
"required": True
}
output = {
"messagegroups": {
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies = [
"jsonpath-ng>=1.6.1",
"jsoncomparison>=1.1.0",
"requests>=2.31.0",
"azure-kusto-data>=4.4.1",
]
dev-dependencies = [
"pytest>=7.2.1",
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ pylint==3.1.0
dataclasses==0.6
dataclasses_json==0.6.4
pydantic==2.7.1
avro==1.11.3
avro==1.11.3
azure-kusto-data==4.4.1

0 comments on commit 941a089

Please sign in to comment.