From a541284bc675306f941a0121745608506c8442d4 Mon Sep 17 00:00:00 2001 From: Mateusz Kulas <60038363+m-qlas@users.noreply.github.com> Date: Thu, 20 Jul 2023 09:49:59 +0200 Subject: [PATCH] Support mapping complex types (#43) * Support mapping complex types * Fix logs --------- Co-authored-by: Mateusz Kulas --- README.md | 3 +- config_examples/s3_delta.yaml | 7 +- odd_collector_aws/__init__.py | 11 +++ odd_collector_aws/__main__.py | 6 ++ odd_collector_aws/__version__.py | 1 + .../adapters/s3_delta/adapter.py | 1 - odd_collector_aws/adapters/s3_delta/client.py | 5 +- .../adapters/s3_delta/mappers/delta_table.py | 15 ++-- .../adapters/s3_delta/mappers/field.py | 86 +++++++------------ .../s3_delta/mappers/grammar/field_types.lark | 40 +++++++++ .../adapters/s3_delta/models/field.py | 25 ++++++ odd_collector_aws/domain/plugin.py | 6 +- poetry.lock | 47 ++++++++-- pyproject.toml | 4 +- 14 files changed, 178 insertions(+), 79 deletions(-) create mode 100644 odd_collector_aws/__version__.py create mode 100644 odd_collector_aws/adapters/s3_delta/mappers/grammar/field_types.lark create mode 100644 odd_collector_aws/adapters/s3_delta/models/field.py diff --git a/README.md b/README.md index cc7f600..5368183 100644 --- a/README.md +++ b/README.md @@ -14,13 +14,14 @@ To learn more about collector types and ODD Platform's architecture, [read the d ## Implemented adapters | Service | Config example | -| ------------------------------------------------------------ | ----------------------------------------------------- | +|--------------------------------------------------------------|-------------------------------------------------------| | Athena | [config](config_examples/athena.yaml) | | DynamoDB | [config](config_examples/dynamodb.yaml) | | Glue | [config](config_examples/glue.yaml) | | Kinesis | [config](config_examples/kinesis.yaml) | | Quicksight | [config](config_examples/quicksight.yaml) | | S3 | [config](config_examples/s3.yaml) | +| S3_Delta | [config](config_examples/s3_delta.yaml) | | Sagemaker | [config](config_examples/sagemaker.yaml) | | SQS | [config](config_examples/sqs.yaml) | | SagemakerFeaturestore | [config](config_examples/sagemaker_featurestore.yaml) | diff --git a/config_examples/s3_delta.yaml b/config_examples/s3_delta.yaml index f9d4db1..cb6f6f2 100644 --- a/config_examples/s3_delta.yaml +++ b/config_examples/s3_delta.yaml @@ -27,9 +27,12 @@ plugins: delta_tables: - bucket: bucket prefix: delta_data - object_filter: # will exclude all folders with _pii at the end of name from ingestion + filter: # will include only tables with "events" and exclude all tables with "_pii" in the table prefix and name exclude: - - ".*_pii" + include: [ "events" ] + exclude: [ "_pii" ] + + # Minio S3 Delta Lake Adapter default_pulling_interval: 10 diff --git a/odd_collector_aws/__init__.py b/odd_collector_aws/__init__.py index e69de29..fabaf86 100644 --- a/odd_collector_aws/__init__.py +++ b/odd_collector_aws/__init__.py @@ -0,0 +1,11 @@ +from odd_collector_aws.logger import logger + + +def get_version() -> str: + try: + from odd_collector_aws.__version__ import VERSION + + return VERSION + except Exception as e: + logger.warning(f"Can't get version from odd_collector_aws.__version__. {e}") + return "-" diff --git a/odd_collector_aws/__main__.py b/odd_collector_aws/__main__.py index cc6ab2f..acc3ab5 100644 --- a/odd_collector_aws/__main__.py +++ b/odd_collector_aws/__main__.py @@ -1,12 +1,18 @@ from pathlib import Path +import odd_collector_sdk as sdk from odd_collector_sdk.collector import Collector +from odd_collector_aws import get_version from odd_collector_aws.domain.plugin import PLUGIN_FACTORY +from odd_collector_aws.logger import logger COLLECTOR_PACKAGE = __package__ CONFIG_PATH = Path().cwd() / "collector_config.yaml" +logger.info(f"AWS collector version: {get_version()}") +logger.info(f"SDK: {sdk.get_version()}") + collector = Collector( config_path=CONFIG_PATH, root_package=COLLECTOR_PACKAGE, diff --git a/odd_collector_aws/__version__.py b/odd_collector_aws/__version__.py new file mode 100644 index 0000000..ab1bdd5 --- /dev/null +++ b/odd_collector_aws/__version__.py @@ -0,0 +1 @@ +VERSION = "0.1.4" diff --git a/odd_collector_aws/adapters/s3_delta/adapter.py b/odd_collector_aws/adapters/s3_delta/adapter.py index 5855eed..65900cd 100644 --- a/odd_collector_aws/adapters/s3_delta/adapter.py +++ b/odd_collector_aws/adapters/s3_delta/adapter.py @@ -13,7 +13,6 @@ from .mappers.delta_table import map_delta_table # TODO: Add tags -# TODO: Complex types types class Adapter(BaseAdapter): diff --git a/odd_collector_aws/adapters/s3_delta/client.py b/odd_collector_aws/adapters/s3_delta/client.py index 878428e..043f61c 100644 --- a/odd_collector_aws/adapters/s3_delta/client.py +++ b/odd_collector_aws/adapters/s3_delta/client.py @@ -15,7 +15,7 @@ def handle_values( - obj: dict, handler: tuple[str, callable] + obj: dict, handler: tuple[str, callable] ) -> tuple[str, Optional[any]]: key, callback = handler return key, silent(callback)(obj.get(key)) @@ -76,8 +76,9 @@ def handle_folder(self, config: DeltaTableConfig) -> Iterable[DTable]: folders = filter(lambda obj: not obj.is_file, objects) allowed = filter(lambda folder: folder.base_name, folders) + filtered = filter(lambda item: config.allow(item.path), allowed) - for obj in allowed: + for obj in filtered: new_config = config.append_prefix(obj.base_name) yield from self.get_table(new_config) diff --git a/odd_collector_aws/adapters/s3_delta/mappers/delta_table.py b/odd_collector_aws/adapters/s3_delta/mappers/delta_table.py index 7735a8e..c3895b5 100644 --- a/odd_collector_aws/adapters/s3_delta/mappers/delta_table.py +++ b/odd_collector_aws/adapters/s3_delta/mappers/delta_table.py @@ -2,6 +2,7 @@ from oddrn_generator import S3Generator from odd_collector_aws.utils.parse_s3_url import parse_s3_url +from ..models.field import DField from ..models.table import DTable from .field import map_field @@ -10,9 +11,14 @@ def map_delta_table(generator: S3Generator, delta_table: DTable) -> DataEntity: bucket, key = parse_s3_url(delta_table.table_uri) - + fields = [DField(field) for field in delta_table.schema.fields] generator.set_oddrn_paths(buckets=bucket, keys=key) + field_list = [] + for field in fields: + processed_ds_fields = map_field(generator, field) + field_list.extend(processed_ds_fields) + return DataEntity( oddrn=generator.get_oddrn_by_path("keys"), name=key, @@ -21,10 +27,5 @@ def map_delta_table(generator: S3Generator, delta_table: DTable) -> DataEntity: updated_at=delta_table.updated_at, owner=None, metadata=map_metadata(delta_table), - dataset=DataSet( - rows_number=delta_table.num_rows, - field_list=[ - map_field(generator, field) for field in delta_table.schema.fields - ], - ), + dataset=DataSet(rows_number=delta_table.num_rows, field_list=field_list), ) diff --git a/odd_collector_aws/adapters/s3_delta/mappers/field.py b/odd_collector_aws/adapters/s3_delta/mappers/field.py index 4181355..443c070 100644 --- a/odd_collector_aws/adapters/s3_delta/mappers/field.py +++ b/odd_collector_aws/adapters/s3_delta/mappers/field.py @@ -1,60 +1,38 @@ -from deltalake._internal import ArrayType, Field, MapType, PrimitiveType, StructType -from odd_models.models import DataSetField, DataSetFieldType, Type +from pathlib import Path +from odd_collector_sdk.grammar_parser.build_dataset_field import DatasetFieldBuilder +from odd_models.models import DataSetField, Type from oddrn_generator import S3Generator - -from ..logger import logger - -DELTA_TO_ODD_TYPE_MAP = { +from ..models.field import DField + +DELTA_TO_ODD_TYPE_MAP: dict[str, Type] = { + "float": Type.TYPE_NUMBER, + "struct": Type.TYPE_STRUCT, + "bigint": Type.TYPE_INTEGER, + "binary": Type.TYPE_BINARY, + "boolean": Type.TYPE_BOOLEAN, + "date": Type.TYPE_DATETIME, + "decimal": Type.TYPE_NUMBER, + "double": Type.TYPE_NUMBER, + "void": Type.TYPE_UNKNOWN, + "smallint": Type.TYPE_INTEGER, + "timestamp": Type.TYPE_TIME, + "tinyint": Type.TYPE_INTEGER, + "array": Type.TYPE_LIST, + "map": Type.TYPE_MAP, + "int": Type.TYPE_INTEGER, + "long": Type.TYPE_INTEGER, "string": Type.TYPE_STRING, + "interval": Type.TYPE_DURATION, } -def map_to_odd_type(delta_type: str) -> Type: - return DELTA_TO_ODD_TYPE_MAP.get(delta_type, Type.TYPE_UNKNOWN) - - -def unknown_field(generator: S3Generator, field: Field) -> DataSetField: - generator.set_oddrn_paths(columns=field.name) - return DataSetField( - oddrn=generator.get_oddrn_by_path("columns"), - name=field.name, - type=DataSetFieldType( - type=Type.TYPE_UNKNOWN, - logical_type=field.type, - is_nullable=field.nullable, - ), +def map_field(oddrn_generator: S3Generator, column: DField) -> list[DataSetField]: + field_builder = DatasetFieldBuilder( + data_source="s3_delta", + oddrn_generator=oddrn_generator, + parser_config_path=Path( + "odd_collector_aws/adapters/s3_delta/mappers/grammar/field_types.lark" + ).absolute(), + odd_types_map=DELTA_TO_ODD_TYPE_MAP, ) - - -def map_primitive(generator: S3Generator, field: Field) -> DataSetField: - generator.set_oddrn_paths(columns=field.name) - return DataSetField( - oddrn=generator.get_oddrn_by_path("columns"), - name=field.name, - type=DataSetFieldType( - type=map_to_odd_type(field.type.type), - logical_type=field.type.type, - is_nullable=field.nullable, - ), - ) - - -def map_map(generator: S3Generator, field: Field) -> DataSetField: - ... - - -def map_struct(generator: S3Generator, field: Field) -> DataSetField: - ... - - -def map_array(generator: S3Generator, field: Field) -> DataSetField: - ... - - -def map_field(generator: S3Generator, field: Field) -> DataSetField: - type_ = field.type - - if isinstance(type_, PrimitiveType): - return map_primitive(generator, field) - else: - logger.error(f"Unknown field type: {field.type}") + return field_builder.build_dataset_field(column) diff --git a/odd_collector_aws/adapters/s3_delta/mappers/grammar/field_types.lark b/odd_collector_aws/adapters/s3_delta/mappers/grammar/field_types.lark new file mode 100644 index 0000000..4b680b2 --- /dev/null +++ b/odd_collector_aws/adapters/s3_delta/mappers/grammar/field_types.lark @@ -0,0 +1,40 @@ +?start: type + +BASIC_TYPE: "string" + | "int" + | "float" + | "double" + | "bigint" + | "binary" + | "boolean" + | "date" + | "decimal" + | "void" + | "interval" + | "smallint" + | "timestamp" + | "timestamp_ntz" + | "tinyint" + | "long" + +primitive_type: "PrimitiveType(\"" BASIC_TYPE "\")" + +FIELD_NAME: (LETTER | "_") (LETTER | "_" | DIGIT | "-")* + +array: "ArrayType" "(" type ", " "contains_null=" BOOL ")" + +field: "Field(" FIELD_NAME ", " type ", " "nullable=" BOOL ")" + +struct: "StructType([" [field (", " field)*] "])" + +map: "MapType(" type ", " type ", " "value_contains_null=" BOOL ")" + +?type: primitive_type + | array + | struct + | map + +BOOL: "True" | "False" + +%import common.LETTER +%import common.DIGIT diff --git a/odd_collector_aws/adapters/s3_delta/models/field.py b/odd_collector_aws/adapters/s3_delta/models/field.py new file mode 100644 index 0000000..ac76314 --- /dev/null +++ b/odd_collector_aws/adapters/s3_delta/models/field.py @@ -0,0 +1,25 @@ +from typing import Union + +from deltalake import Field +from deltalake._internal import PrimitiveType, ArrayType, MapType, StructType + + +class DField: + def __init__(self, field: Field): + self.field = field + + @property + def odd_metadata(self) -> dict: + return self.field.metadata + + @property + def name(self) -> str: + return self.field.name + + @property + def type(self) -> Union[PrimitiveType, ArrayType, MapType, StructType]: + return self.field.type + + @property + def nullable(self) -> bool: + return self.nullable diff --git a/odd_collector_aws/domain/plugin.py b/odd_collector_aws/domain/plugin.py index 5b3a96a..2c65ef2 100644 --- a/odd_collector_aws/domain/plugin.py +++ b/odd_collector_aws/domain/plugin.py @@ -45,7 +45,7 @@ class DeltaTableConfig(BaseModel): scheme: str = Field(default="s3", alias="schema") bucket: str prefix: str - object_filter: Optional[Filter] = Filter() + filter: Optional[Filter] = Filter() @property def path(self) -> str: @@ -56,11 +56,11 @@ def append_prefix(self, path: str) -> "DeltaTableConfig": schema=self.scheme, bucket=self.bucket, prefix=f"{self.prefix}/{path}", - object_filter=self.object_filter, + filter=self.filter, ) def allow(self, name: str) -> bool: - return self.object_filter.is_allowed(name) + return self.filter.is_allowed(name) class S3DeltaPlugin(AwsPlugin): diff --git a/poetry.lock b/poetry.lock index aff72bd..4d2db7c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -874,24 +874,27 @@ files = [ [[package]] name = "odd-collector-sdk" -version = "0.3.31" +version = "0.3.40" description = "ODD Collector" category = "main" optional = false python-versions = ">=3.9,<4.0" files = [ - {file = "odd_collector_sdk-0.3.31-py3-none-any.whl", hash = "sha256:90fbec1dba972ad6f684d22bb56cbda6229580600cc7c1f08425e60ff4fb9ee6"}, - {file = "odd_collector_sdk-0.3.31.tar.gz", hash = "sha256:4ccd76659bf1b54a939544d1b73b901b8e4e9385f4b5d8272f56cc5cb0c0ae23"}, + {file = "odd_collector_sdk-0.3.40-py3-none-any.whl", hash = "sha256:da308c2accbfdb29089cf98dba3d73d796592f5c4dcf4cb9239f689b39481f67"}, + {file = "odd_collector_sdk-0.3.40.tar.gz", hash = "sha256:453461c5fa92e888324ffce0c48eadd9d8c4cfea4528d18412c5e6d74f4ede2c"}, ] [package.dependencies] aiohttp = ">=3.8.1,<4.0.0" APScheduler = ">=3.8.1,<4.0.0" +flatdict = ">=4.0.1,<5.0.0" funcy = ">=2.0,<3.0" importlib-metadata = ">=5.1.0,<6.0.0" +lark-parser = ">=0.12.0,<0.13.0" loguru = ">=0.6.0,<0.7.0" odd-models = ">=2.0.28,<3.0.0" oddrn-generator = ">=0.1.73,<0.2.0" +prettytable = ">=3.8.0,<4.0.0" pyaml-env = ">=1.1.5,<2.0.0" pydantic = ">=1.8.2,<2.0.0" tqdm = ">=4.64.1,<5.0.0" @@ -919,14 +922,14 @@ sqlparse = "0.4.2" [[package]] name = "oddrn-generator" -version = "0.1.81" +version = "0.1.87" description = "Open Data Discovery Resource Name Generator" category = "main" optional = false python-versions = ">=3.9,<4.0" files = [ - {file = "oddrn_generator-0.1.81-py3-none-any.whl", hash = "sha256:188274459ba501db37087d88737018af1b13a69d9fa08cd258e665bdfc99caf3"}, - {file = "oddrn_generator-0.1.81.tar.gz", hash = "sha256:f9161901b4596e7262746f4d143791f07bf95b26bd0eacef5365be8c8fd82a6a"}, + {file = "oddrn_generator-0.1.87-py3-none-any.whl", hash = "sha256:b904efba2ec8e2b81bf7c6351387c83294da547c797af7c3e06a8033923f8780"}, + {file = "oddrn_generator-0.1.87.tar.gz", hash = "sha256:3d167c139a554428755bfc7a89cf9086b09e4a4e31ed8a39425bbecc96e0f641"}, ] [package.dependencies] @@ -1007,6 +1010,24 @@ nodeenv = ">=0.11.1" pyyaml = ">=5.1" virtualenv = ">=20.10.0" +[[package]] +name = "prettytable" +version = "3.8.0" +description = "A simple Python library for easily displaying tabular data in a visually appealing ASCII table format" +category = "main" +optional = false +python-versions = ">=3.8" +files = [ + {file = "prettytable-3.8.0-py3-none-any.whl", hash = "sha256:03481bca25ae0c28958c8cd6ac5165c159ce89f7ccde04d5c899b24b68bb13b7"}, + {file = "prettytable-3.8.0.tar.gz", hash = "sha256:031eae6a9102017e8c7c7906460d150b7ed78b20fd1d8c8be4edaf88556c07ce"}, +] + +[package.dependencies] +wcwidth = "*" + +[package.extras] +tests = ["pytest", "pytest-cov", "pytest-lazy-fixture"] + [[package]] name = "pyaml-env" version = "1.2.1" @@ -1452,6 +1473,18 @@ platformdirs = ">=3.2,<4" docs = ["furo (>=2023.3.27)", "proselint (>=0.13)", "sphinx (>=6.1.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=22.12)"] test = ["covdefaults (>=2.3)", "coverage (>=7.2.3)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.3.1)", "pytest-env (>=0.8.1)", "pytest-freezegun (>=0.4.2)", "pytest-mock (>=3.10)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=67.7.1)", "time-machine (>=2.9)"] +[[package]] +name = "wcwidth" +version = "0.2.6" +description = "Measures the displayed width of unicode strings in a terminal" +category = "main" +optional = false +python-versions = "*" +files = [ + {file = "wcwidth-0.2.6-py2.py3-none-any.whl", hash = "sha256:795b138f6875577cd91bba52baf9e445cd5118fd32723b460e30a0af30ea230e"}, + {file = "wcwidth-0.2.6.tar.gz", hash = "sha256:a5220780a404dbe3353789870978e472cfe477761f06ee55077256e509b156d0"}, +] + [[package]] name = "win32-setctime" version = "1.1.0" @@ -1574,4 +1607,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "1877f1f3f8b978205762928ee49ab5b9c88ac29a05bde03f0ef3fe567356b2dc" +content-hash = "df2fee20a064e6051c501cbdcf46ee72b8911f518240c2d61c9bf0d5795bee9b" diff --git a/pyproject.toml b/pyproject.toml index e43756b..996d4bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,14 +9,14 @@ repository = "https://github.com/opendatadiscovery/odd-collector-aws" [tool.poetry.dependencies] python = "^3.9" -oddrn-generator = "^0.1.80" +oddrn-generator = "^0.1.87" python-dotenv = "0.19.0" boto3 = "1.18.44" pyhumps = "3.0.2" pyarrow = "^10.0.1" humps = "0.2.2" flatdict = "4.0.1" -odd-collector-sdk = "^0.3.31" +odd-collector-sdk = "^0.3.40" lark-parser = "^0.12.0" deltalake = "^0.9.0"