Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
Support mapping complex types (#43)
Browse files Browse the repository at this point in the history
* Support mapping complex types

* Fix logs

---------

Co-authored-by: Mateusz Kulas <mkulas@provectus.com>
  • Loading branch information
m-qlas and Mateusz Kulas committed Jul 20, 2023
1 parent 927e89b commit a541284
Show file tree
Hide file tree
Showing 14 changed files with 178 additions and 79 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ To learn more about collector types and ODD Platform's architecture, [read the d

## Implemented adapters
| Service | Config example |
| ------------------------------------------------------------ | ----------------------------------------------------- |
|--------------------------------------------------------------|-------------------------------------------------------|
| Athena <a name="athena"></a> | [config](config_examples/athena.yaml) |
| DynamoDB <a name="dynamodb"></a> | [config](config_examples/dynamodb.yaml) |
| Glue <a name="glue"></a> | [config](config_examples/glue.yaml) |
| Kinesis <a name="kinesis"></a> | [config](config_examples/kinesis.yaml) |
| Quicksight <a name="quicksight"></a> | [config](config_examples/quicksight.yaml) |
| S3 <a name="s3"></a> | [config](config_examples/s3.yaml) |
| S3_Delta <a name="s3_delta"></a> | [config](config_examples/s3_delta.yaml) |
| Sagemaker <a name="sagemaker"></a> | [config](config_examples/sagemaker.yaml) |
| SQS <a name="sqs"></a> | [config](config_examples/sqs.yaml) |
| SagemakerFeaturestore <a name="sagemaker-featurestore"></a> | [config](config_examples/sagemaker_featurestore.yaml) |
Expand Down
7 changes: 5 additions & 2 deletions config_examples/s3_delta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ plugins:
delta_tables:
- bucket: bucket
prefix: delta_data
object_filter: # will exclude all folders with _pii at the end of name from ingestion
filter: # will include only tables with "events" and exclude all tables with "_pii" in the table prefix and name
exclude:
- ".*_pii"
include: [ "events" ]
exclude: [ "_pii" ]



# Minio S3 Delta Lake Adapter
default_pulling_interval: 10
Expand Down
11 changes: 11 additions & 0 deletions odd_collector_aws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from odd_collector_aws.logger import logger


def get_version() -> str:
try:
from odd_collector_aws.__version__ import VERSION

return VERSION
except Exception as e:
logger.warning(f"Can't get version from odd_collector_aws.__version__. {e}")
return "-"
6 changes: 6 additions & 0 deletions odd_collector_aws/__main__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from pathlib import Path

import odd_collector_sdk as sdk
from odd_collector_sdk.collector import Collector

from odd_collector_aws import get_version
from odd_collector_aws.domain.plugin import PLUGIN_FACTORY
from odd_collector_aws.logger import logger

COLLECTOR_PACKAGE = __package__
CONFIG_PATH = Path().cwd() / "collector_config.yaml"

logger.info(f"AWS collector version: {get_version()}")
logger.info(f"SDK: {sdk.get_version()}")

collector = Collector(
config_path=CONFIG_PATH,
root_package=COLLECTOR_PACKAGE,
Expand Down
1 change: 1 addition & 0 deletions odd_collector_aws/__version__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
VERSION = "0.1.4"
1 change: 0 additions & 1 deletion odd_collector_aws/adapters/s3_delta/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from .mappers.delta_table import map_delta_table

# TODO: Add tags
# TODO: Complex types types


class Adapter(BaseAdapter):
Expand Down
5 changes: 3 additions & 2 deletions odd_collector_aws/adapters/s3_delta/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


def handle_values(
obj: dict, handler: tuple[str, callable]
obj: dict, handler: tuple[str, callable]
) -> tuple[str, Optional[any]]:
key, callback = handler
return key, silent(callback)(obj.get(key))
Expand Down Expand Up @@ -76,8 +76,9 @@ def handle_folder(self, config: DeltaTableConfig) -> Iterable[DTable]:

folders = filter(lambda obj: not obj.is_file, objects)
allowed = filter(lambda folder: folder.base_name, folders)
filtered = filter(lambda item: config.allow(item.path), allowed)

for obj in allowed:
for obj in filtered:
new_config = config.append_prefix(obj.base_name)
yield from self.get_table(new_config)

Expand Down
15 changes: 8 additions & 7 deletions odd_collector_aws/adapters/s3_delta/mappers/delta_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from oddrn_generator import S3Generator

from odd_collector_aws.utils.parse_s3_url import parse_s3_url
from ..models.field import DField

from ..models.table import DTable
from .field import map_field
Expand All @@ -10,9 +11,14 @@

def map_delta_table(generator: S3Generator, delta_table: DTable) -> DataEntity:
bucket, key = parse_s3_url(delta_table.table_uri)

fields = [DField(field) for field in delta_table.schema.fields]
generator.set_oddrn_paths(buckets=bucket, keys=key)

field_list = []
for field in fields:
processed_ds_fields = map_field(generator, field)
field_list.extend(processed_ds_fields)

return DataEntity(
oddrn=generator.get_oddrn_by_path("keys"),
name=key,
Expand All @@ -21,10 +27,5 @@ def map_delta_table(generator: S3Generator, delta_table: DTable) -> DataEntity:
updated_at=delta_table.updated_at,
owner=None,
metadata=map_metadata(delta_table),
dataset=DataSet(
rows_number=delta_table.num_rows,
field_list=[
map_field(generator, field) for field in delta_table.schema.fields
],
),
dataset=DataSet(rows_number=delta_table.num_rows, field_list=field_list),
)
86 changes: 32 additions & 54 deletions odd_collector_aws/adapters/s3_delta/mappers/field.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,38 @@
from deltalake._internal import ArrayType, Field, MapType, PrimitiveType, StructType
from odd_models.models import DataSetField, DataSetFieldType, Type
from pathlib import Path
from odd_collector_sdk.grammar_parser.build_dataset_field import DatasetFieldBuilder
from odd_models.models import DataSetField, Type
from oddrn_generator import S3Generator

from ..logger import logger

DELTA_TO_ODD_TYPE_MAP = {
from ..models.field import DField

DELTA_TO_ODD_TYPE_MAP: dict[str, Type] = {
"float": Type.TYPE_NUMBER,
"struct": Type.TYPE_STRUCT,
"bigint": Type.TYPE_INTEGER,
"binary": Type.TYPE_BINARY,
"boolean": Type.TYPE_BOOLEAN,
"date": Type.TYPE_DATETIME,
"decimal": Type.TYPE_NUMBER,
"double": Type.TYPE_NUMBER,
"void": Type.TYPE_UNKNOWN,
"smallint": Type.TYPE_INTEGER,
"timestamp": Type.TYPE_TIME,
"tinyint": Type.TYPE_INTEGER,
"array": Type.TYPE_LIST,
"map": Type.TYPE_MAP,
"int": Type.TYPE_INTEGER,
"long": Type.TYPE_INTEGER,
"string": Type.TYPE_STRING,
"interval": Type.TYPE_DURATION,
}


def map_to_odd_type(delta_type: str) -> Type:
return DELTA_TO_ODD_TYPE_MAP.get(delta_type, Type.TYPE_UNKNOWN)


def unknown_field(generator: S3Generator, field: Field) -> DataSetField:
generator.set_oddrn_paths(columns=field.name)
return DataSetField(
oddrn=generator.get_oddrn_by_path("columns"),
name=field.name,
type=DataSetFieldType(
type=Type.TYPE_UNKNOWN,
logical_type=field.type,
is_nullable=field.nullable,
),
def map_field(oddrn_generator: S3Generator, column: DField) -> list[DataSetField]:
field_builder = DatasetFieldBuilder(
data_source="s3_delta",
oddrn_generator=oddrn_generator,
parser_config_path=Path(
"odd_collector_aws/adapters/s3_delta/mappers/grammar/field_types.lark"
).absolute(),
odd_types_map=DELTA_TO_ODD_TYPE_MAP,
)


def map_primitive(generator: S3Generator, field: Field) -> DataSetField:
generator.set_oddrn_paths(columns=field.name)
return DataSetField(
oddrn=generator.get_oddrn_by_path("columns"),
name=field.name,
type=DataSetFieldType(
type=map_to_odd_type(field.type.type),
logical_type=field.type.type,
is_nullable=field.nullable,
),
)


def map_map(generator: S3Generator, field: Field) -> DataSetField:
...


def map_struct(generator: S3Generator, field: Field) -> DataSetField:
...


def map_array(generator: S3Generator, field: Field) -> DataSetField:
...


def map_field(generator: S3Generator, field: Field) -> DataSetField:
type_ = field.type

if isinstance(type_, PrimitiveType):
return map_primitive(generator, field)
else:
logger.error(f"Unknown field type: {field.type}")
return field_builder.build_dataset_field(column)
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
?start: type

BASIC_TYPE: "string"
| "int"
| "float"
| "double"
| "bigint"
| "binary"
| "boolean"
| "date"
| "decimal"
| "void"
| "interval"
| "smallint"
| "timestamp"
| "timestamp_ntz"
| "tinyint"
| "long"

primitive_type: "PrimitiveType(\"" BASIC_TYPE "\")"

FIELD_NAME: (LETTER | "_") (LETTER | "_" | DIGIT | "-")*

array: "ArrayType" "(" type ", " "contains_null=" BOOL ")"

field: "Field(" FIELD_NAME ", " type ", " "nullable=" BOOL ")"

struct: "StructType([" [field (", " field)*] "])"

map: "MapType(" type ", " type ", " "value_contains_null=" BOOL ")"

?type: primitive_type
| array
| struct
| map

BOOL: "True" | "False"

%import common.LETTER
%import common.DIGIT
25 changes: 25 additions & 0 deletions odd_collector_aws/adapters/s3_delta/models/field.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Union

from deltalake import Field
from deltalake._internal import PrimitiveType, ArrayType, MapType, StructType


class DField:
def __init__(self, field: Field):
self.field = field

@property
def odd_metadata(self) -> dict:
return self.field.metadata

@property
def name(self) -> str:
return self.field.name

@property
def type(self) -> Union[PrimitiveType, ArrayType, MapType, StructType]:
return self.field.type

@property
def nullable(self) -> bool:
return self.nullable
6 changes: 3 additions & 3 deletions odd_collector_aws/domain/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DeltaTableConfig(BaseModel):
scheme: str = Field(default="s3", alias="schema")
bucket: str
prefix: str
object_filter: Optional[Filter] = Filter()
filter: Optional[Filter] = Filter()

@property
def path(self) -> str:
Expand All @@ -56,11 +56,11 @@ def append_prefix(self, path: str) -> "DeltaTableConfig":
schema=self.scheme,
bucket=self.bucket,
prefix=f"{self.prefix}/{path}",
object_filter=self.object_filter,
filter=self.filter,
)

def allow(self, name: str) -> bool:
return self.object_filter.is_allowed(name)
return self.filter.is_allowed(name)


class S3DeltaPlugin(AwsPlugin):
Expand Down
Loading

0 comments on commit a541284

Please sign in to comment.